# Tech Stocks Data Prep

Code to prepare portions of the data stocks data for the influxdb demo.

## Setup and Imports

In [1]:
import pandas as pd
import os
from datetime import datetime, date
from typing import Iterable, List
import csv

## Stock Subset

Take only 2017-2018 stocks trade data.

In [3]:
# configuration vars
SOURCE_LARGE_STOCKS_DATAFILE = r"../../../data-generation/stocks/data/tech_trades.csv"
OUTPUT_FILTERED_STOCK_DATAFILE = r"../influxdb-docker/volumes/data/trades-md.tar.gz"
FILTER_MIN_DATE: date = datetime(2018, 8, 1)
FILTER_MAX_DATE: date = datetime(2018, 9, 1)
FILTER_DATE_RANGE = (FILTER_MIN_DATE, FILTER_MAX_DATE)
OFFSET_TIMESTAMPS = True        # to offset the timestamps to start from now and go backwards


# check to see if input file exists
assert os.path.exists(SOURCE_LARGE_STOCKS_DATAFILE), f"Original stocks data file does NOT exist: ${SOURCE_LARGE_STOCKS_DATAFILE}"
# create the output folder if it doesn't exists
if not os.path.isdir(os.path.dirname(OUTPUT_FILTERED_STOCK_DATAFILE)):
    print(f"Creating output folder: ${os.path.dirname(OUTPUT_FILTERED_STOCK_DATAFILE)}")
    os.makedirs(os.path.dirname(OUTPUT_FILTERED_STOCK_DATAFILE), mode=0o740, exist_ok=True)

# read source csv in chunks
chunks: Iterable[pd.DataFrame] = pd.read_csv(SOURCE_LARGE_STOCKS_DATAFILE, chunksize=100_000, on_bad_lines='skip')
filtered_dfs: List[pd.DataFrame] = []   # buffer to hold filtered df chunks
total_records = 0
filtered_records = 0
print(f"Reading large source by chunks.", end='')
for df in chunks:
    # convert datetime cols
    df['date'] = pd.to_datetime(df['date'], errors='coerce', format='%Y-%m-%d')
    # add up total records processed
    total_records += len(df.index)
    # filter by date range
    # df = df[df['date'].dt.year.isin(FILTERED_YEARS)]
    df = df[ (df['date'] >= FILTER_MIN_DATE) & (df['date'] < FILTER_MAX_DATE) ]
    if len(df.index):
        filtered_records += len(df.index)
        filtered_dfs.append(df.copy())
    # print a trailing process dot
    print(".", end='')
# process done. print newline & progress report
print(f"\ntotal records: {total_records:,} - filtered records: {filtered_records:,}")
# concatenate dataframes
print(f"concatenating {len(filtered_dfs)} dataframes...")
df = pd.concat(filtered_dfs, ignore_index=True)
# display resulting dataframe info
print(f"shape: {df.shape}")
print(f"date min: {df['date'].min()} - max: {df['date'].max()}")
# post-processing
print(f"sorting and post-processing....")
df.sort_values(by=['trade_timestamp', 'ticker'], ignore_index=True, inplace=True)
# dropping cols
df.drop(columns=['trade_id'], inplace=True, errors='ignore')
# convert the timestamp to unix nano seconds
df['trade_timestamp'] = pd.to_datetime(df['trade_timestamp'], errors='coerce', format=r'%Y-%m-%d %H:%M:%S')
df['trade_timestamp'] = df['trade_timestamp'].astype('int64')
# offset the records timestamps to start from now() and go backwards in time
if OFFSET_TIMESTAMPS:
    # calculate the offset between the largest timestamp and now
    offset = int(datetime.now().timestamp() * 1_000_000_000) - df['trade_timestamp'].max()
    # shift all timestamps up
    df['trade_timestamp'] += offset
# write output file
print(f"writing to output file: {OUTPUT_FILTERED_STOCK_DATAFILE}")
df.to_csv(OUTPUT_FILTERED_STOCK_DATAFILE, mode='w', index=False, compression='infer', quoting=csv.QUOTE_MINIMAL)


Reading large source by chunks..............................................
total records: 4,449,431 - filtered records: 53,890
concatenating 2 dataframes...
shape: (53890, 13)
date min: 2018-08-01 00:00:00 - max: 2018-08-24 00:00:00
sorting and post-processing....
writing to output file: ../influxdb-docker/volumes/data/trades-md.tar.gz
