In [8]:
# ! pip install pandas dask dask[diagnostics] numpy tomli tqdm pyarrow fastparquet

In [9]:
import pyarrow.parquet as pq
import pyarrow as pa
from tqdm import trange
import dask.dataframe as dd
import pandas as pd
import numpy as np
import tomli
import os

In [10]:
# Load Config
CONFIG_FILE_PATH = "../config.tomli"

with open(CONFIG_FILE_PATH, 'rb') as config_file:
    config = tomli.load(config_file)

ETF_DATA_DRIVE_PATH = f"../{config['data']['etfs']}"
STOCK_DATA_DRIVE_PATH = f"../{config['data']['stocks']}"
PROCESSED_DATA_DRIVE_PATH = f"../{config['data']['processed']}"
SYMBOLS_FILE_PATH = f"../{config['data']['symbols']}"

DATASET_PATH = f"{PROCESSED_DATA_DRIVE_PATH}/dataset.parquet"

data_dtypes = config['etf_stock_data_type']
symbols_dtype = config['symbols_data_types']

date_format = config['format']['date_format']

In [11]:
%time
# load symbols info and index for future query
symbols_df = pd.read_csv(
    SYMBOLS_FILE_PATH, 
    dtype=symbols_dtype, 
    index_col='Symbol'
)
# drop other columns to conserve space
symbols_df.drop(
    [
        'Nasdaq Traded', 
        'Listing Exchange',
        'Market Category',
        'Round Lot Size',
        'Test Issue',
        'Financial Status',
        # 'CQS Symbol',
        # 'NASDAQ Symbol',
        'NextShares'
    ],
    axis=1,
    inplace=True
)
symbols_df.head()

CPU times: total: 0 ns
Wall time: 0 ns


Unnamed: 0_level_0,Security Name,ETF,CQS Symbol,NASDAQ Symbol
Symbol,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
A,"Agilent Technologies, Inc. Common Stock",N,A,A
AA,Alcoa Corporation Common Stock,N,AA,AA
AAAU,Perth Mint Physical Gold ETF,Y,AAAU,AAAU
AACG,ATA Creativity Global - American Depositary Sh...,N,,AACG
AADR,AdvisorShares Dorsey Wright ADR ETF,Y,AADR,AADR


Note: 
* This can be stored in a RDBMS which will save the redundant column space ('Symbol', 'Security Name') but going with parquet since it is mentioned in the readme.
* Additianl steps can be added to segment the load if RAM utilization is an issue. Since this has not been specified I am working under the assumption that there are no memory constraints.


In [12]:
def get_file_name(index:int, file_list:list):
    return file_list[index]

def get_file_path(file_name, base_path:str):
    return f"{base_path}/{file_name}"

def get_symbol(file_name:str):
    return file_name.split(".")[0]

def get_security_name(symbol:str, symbols_df:pd.DataFrame):
    try:
        security_name = symbols_df.loc[symbol]['Security Name'] 
        return security_name
    except KeyError:
        print(f"unable to extract symbol name for {symbol}")
        return None  

def get_security_df(file_path:str, symbol:str, security_name:str)->pd.DataFrame:
    # read file
    security_df = pd.read_csv(file_path, parse_dates=['Date']) 
    # parse date format                      
    security_df['Date'] = security_df['Date'].dt.strftime(date_format)  
    # dropping null or inf values
    security_df = security_df.replace([np.inf, -np.inf], np.nan).dropna()   
    # set datatypes
    security_df['Open'] = security_df['Open'].astype(float)
    security_df['High'] = security_df['High'].astype(float)
    security_df['Low'] = security_df['Low'].astype(float)
    security_df['Close'] = security_df['Close'].astype(float)
    security_df['Adj Close'] = security_df['Adj Close'].astype(float)
    security_df['Volume'] = security_df['Volume'].round().astype(int)    
    # Add additional columns
    # note:     
    # This can be stored in a RDBMS format (SQL) 
    # which will save the redundant column space ('Symbol', 'Security Name') 
    # Since parquet is suggested, it was not considered 
    security_df['Symbol'] = symbol                                                      
    security_df['Security Name'] = security_name    
    return security_df                      

def get_dataset(base_paths:list, symbols_df:pd.DataFrame):
    # init local variables 
    dataset_dfs = []

    # loop over all base paths 
    for base_path in base_paths:
        print(f"Started extraction for directory {base_path}")
        files = sorted(os.listdir(base_path))
        file_count = len(files)
        # loop over all securities in the folder
        for file_index in trange(file_count, unit="file"):
            file_name = get_file_name(
                index=file_index,
                file_list=files
            )
            file_path = get_file_path(
                file_name=file_name, 
                base_path=base_path
            )
            symbol = get_symbol(file_name=file_name)
            security_name = get_security_name(
                symbol=symbol, 
                symbols_df=symbols_df
            )
            # if security info is found continue
            if security_name is not None:
                security_df = get_security_df(
                    file_path=file_path, 
                    symbol=symbol, 
                    security_name=security_name
                )
                dataset_dfs.append(security_df) 
    
    # merge all securities data
    dataset_df = pd.concat(dataset_dfs)
    del dataset_dfs
    return dataset_df
            

In [13]:
%time
# init data directories
base_paths = [ETF_DATA_DRIVE_PATH, STOCK_DATA_DRIVE_PATH]
# get dataset
dataset_df = get_dataset(
    base_paths=base_paths, 
    symbols_df=symbols_df
)
# Save dataset to parquet
dataset_df.to_parquet(DATASET_PATH, index=False)

CPU times: total: 0 ns
Wall time: 0 ns
Started extraction for directory .././data/etfs


100%|██████████| 2165/2165 [00:38<00:00, 56.97file/s]


Started extraction for directory .././data/stocks


  2%|▏         | 140/5884 [00:04<02:14, 42.84file/s]

unable to extract symbol name for AGM-A


 15%|█▌        | 899/5884 [00:25<01:22, 60.23file/s]

unable to extract symbol name for CARR#


 53%|█████▎    | 3114/5884 [01:14<00:53, 51.71file/s]

unable to extract symbol name for LGF


 93%|█████████▎| 5494/5884 [02:07<00:09, 43.28file/s]

unable to extract symbol name for UTX#


100%|██████████| 5884/5884 [02:16<00:00, 43.10file/s]


In [14]:
# ! pip freeze > ../requirements.txt