In [108]:
%load_ext dotenv
%dotenv 

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


# What are we doing?

## Objectives 


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](./images/02_medallion_architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](./images/02_target_pipeline_manager.png)

# Download Data

Download the [Stock Market Dataset from Kaggle](https://www.kaggle.com/datasets/jacksoncrow/stock-market-dataset). Note that you may be required to register for a free account.

Extract all files into the directory: `./05_src/data/prices_csv/`

Your folder structure should include the following paths:

+ `05_src/data/prices_csv/etfs`
+ `05_src/data/prices_csv/stocks`


In [109]:
import pandas as pd
import os
import sys
from glob import glob


sys.path.append(os.getenv('SRC_DIR'))

from utils.logger import get_logger
_logs = get_logger(__name__)

In [110]:
print(sys.path)
print(os.getenv('SRC_DIR'))
print(os.path.exists(os.getenv('SRC_DIR')))

['/home/labber/miniconda3/envs/dsi_participant/lib/python39.zip', '/home/labber/miniconda3/envs/dsi_participant/lib/python3.9', '/home/labber/miniconda3/envs/dsi_participant/lib/python3.9/lib-dynload', '', '/home/labber/miniconda3/envs/dsi_participant/lib/python3.9/site-packages', None, '../../05_src/', '../../05_src/', '../../05_src/', '../../05_src/']
../../05_src/
True


A few things to notice in the code chunk above:

+ Libraries are ordered from high-level to low-level libraries from the package manager (pip in this case, but could be conda, poetry, etc.)
+ The command `sys.path.append("../05_src/)` will add the `../05_src/` directory to the path in the Notebook's kernel. This way, we can use our modules as part of the notebook.
+ Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by the documentation.

Now, to load the historical price data for stocks and ETFs, we could use:

In [111]:
os.getenv('SRC_DIR')
os.path
glob(os.path.join(os.getenv('SRC_DIR'), "data/prices_csv/stocks/*.csv"))

['../../05_src/data/prices_csv/stocks/MAC.csv',
 '../../05_src/data/prices_csv/stocks/BCEL.csv',
 '../../05_src/data/prices_csv/stocks/ACNB.csv',
 '../../05_src/data/prices_csv/stocks/CHDN.csv',
 '../../05_src/data/prices_csv/stocks/BABA.csv',
 '../../05_src/data/prices_csv/stocks/WU.csv',
 '../../05_src/data/prices_csv/stocks/FONR.csv',
 '../../05_src/data/prices_csv/stocks/UNT.csv',
 '../../05_src/data/prices_csv/stocks/STXS.csv',
 '../../05_src/data/prices_csv/stocks/INFY.csv',
 '../../05_src/data/prices_csv/stocks/AKRO.csv',
 '../../05_src/data/prices_csv/stocks/BRFS.csv',
 '../../05_src/data/prices_csv/stocks/GMED.csv',
 '../../05_src/data/prices_csv/stocks/ASTE.csv',
 '../../05_src/data/prices_csv/stocks/DO.csv',
 '../../05_src/data/prices_csv/stocks/NVTA.csv',
 '../../05_src/data/prices_csv/stocks/YVR.csv',
 '../../05_src/data/prices_csv/stocks/NMRD.csv',
 '../../05_src/data/prices_csv/stocks/SDC.csv',
 '../../05_src/data/prices_csv/stocks/ANGI.csv',
 '../../05_src/data/prices_c

In [112]:
import random
stock_files = glob(os.path.join(os.getenv('SRC_DIR'), "data/prices_csv/stocks/*.csv"))


random.seed(42)
stock_files = random.sample(stock_files, 1)

dt_list = []
for s_file in stock_files:
    _logs.info(f"Reading file: {s_file}")
    dt = pd.read_csv(s_file).assign(
        source = os.path.basename(s_file),
        ticker = os.path.basename(s_file).replace('.csv', ''),
        Date = lambda x: pd.to_datetime(x['Date'])
    )
    # print(dt)
    dt_list.append(dt)
    #print(dt_list)
stock_prices = pd.concat(dt_list, axis = 0, ignore_index = True)
print(stock_prices)

2025-06-11 00:29:53,227, 342909591.py, 10, INFO, Reading file: ../../05_src/data/prices_csv/stocks/IAA.csv


           Date       Open       High        Low      Close  Adj Close  \
0    2008-05-22   1.860000   1.950000   1.860000   1.950000   1.950000   
1    2008-05-23   1.950000   2.040000   1.950000   2.040000   2.040000   
2    2008-05-27   2.140000   2.230000   2.140000   2.230000   2.230000   
3    2008-05-28   2.230000   2.340000   2.230000   2.340000   2.340000   
4    2008-05-30   2.340000   2.340000   2.230000   2.230000   2.230000   
...         ...        ...        ...        ...        ...        ...   
2581 2020-03-26  30.889999  33.660000  29.280001  32.730000  32.730000   
2582 2020-03-27  32.459999  32.779999  29.920000  32.310001  32.310001   
2583 2020-03-30  32.570000  33.450001  31.530001  31.930000  31.930000   
2584 2020-03-31  32.119999  32.630001  29.139999  29.959999  29.959999   
2585 2020-04-01  29.240000  30.020000  27.219999  27.590000  27.590000   

          Volume   source ticker  
0       66833800  IAA.csv    IAA  
1     1138641200  IAA.csv    IAA  
2     

In [99]:
import random

# This script reads stock price data from CSV files, processes it, and combines it into a single DataFrame.
# It randomly selects 60 stock files from a specified directory and reads their contents into a DataFrame.
# The DataFrame includes a 'source' column indicating the file name and a 'ticker' column derived from the file name.
# It also converts the 'Date' column to datetime format for easier manipulation and analysis.
# Ensure the SRC_DIR environment variable is set to the correct path
# before running this script.

# what does this glob do? can you be more descriptive?
# The glob function is used to find all CSV files in the specified directory that match the pattern.
# It retrieves all files with a .csv extension in the 'data/prices_csv/stocks' directory.
stock_files = glob(os.path.join(os.getenv('SRC_DIR'), "data/prices_csv/stocks/*.csv"))


random.seed(42)
stock_files = random.sample(stock_files, 60)

dt_list = []
for s_file in stock_files:
    _logs.info(f"Reading file: {s_file}")
    # Read each CSV file into a DataFrame, adding 'source' and 'ticker' columns.
    # The 'source' column contains the file name, and the 'ticker' column is derived from the file name.
    # The 'Date' column is converted to datetime format for easier manipulation.
    dt = pd.read_csv(s_file).assign(
        source = os.path.basename(s_file),
        ticker = os.path.basename(s_file).replace('.csv', ''),
        Date = lambda x: pd.to_datetime(x['Date'])
    )
    # Append the DataFrame to a list for later concatenation. 
    dt_list.append(dt)
# Concatenate all DataFrames in the list into a single DataFrame.
# This DataFrame will contain all stock prices from the selected files, with additional metadata.
stock_prices = pd.concat(dt_list, axis = 0, ignore_index = True)

# Instead of concatenating each dataframe, we add them to a list and then concatenate them all at once.
# Conactenating each dataframe one by one can be inefficient, especially with a large number of files as 
# it creates multiple intermediate DataFrames in memory. You would need to rebuild the entire DataFrame
# each time you concatenate a new DataFrame, which can be slow and memory-intensive.




2025-06-11 00:29:17,514, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/IAA.csv
2025-06-11 00:29:17,613, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/LN.csv
2025-06-11 00:29:17,726, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/BTU.csv
2025-06-11 00:29:17,883, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/WMGI.csv
2025-06-11 00:29:18,299, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/CFXA.csv
2025-06-11 00:29:18,394, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/ICCH.csv
2025-06-11 00:29:18,488, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/CNX.csv
2025-06-11 00:29:18,832, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/KELYA.csv
2025-06-11 00:29:19,287, 4017565964.py, 21, INFO, Reading file: ../../05_src/data/prices_csv/stocks/MLPO.csv
2025-06-11 00:29:19,415

In [100]:
stock_prices

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,source,ticker
0,2008-05-22,1.86,1.950,1.86,1.950,1.950,6.683380e+07,IAA.csv,IAA
1,2008-05-23,1.95,2.040,1.95,2.040,2.040,1.138641e+09,IAA.csv,IAA
2,2008-05-27,2.14,2.230,2.14,2.230,2.230,1.220143e+09,IAA.csv,IAA
3,2008-05-28,2.23,2.340,2.23,2.340,2.340,1.163506e+09,IAA.csv,IAA
4,2008-05-30,2.34,2.340,2.23,2.230,2.230,2.991435e+08,IAA.csv,IAA
...,...,...,...,...,...,...,...,...,...
245470,2020-03-26,0.70,0.892,0.68,0.848,0.848,5.990000e+04,CUI.csv,CUI
245471,2020-03-27,0.85,0.899,0.71,0.860,0.860,8.390000e+04,CUI.csv,CUI
245472,2020-03-30,0.96,0.960,0.88,0.930,0.930,7.620000e+04,CUI.csv,CUI
245473,2020-03-31,0.94,0.980,0.82,0.891,0.891,1.385000e+05,CUI.csv,CUI


Verify the structure of the `stock_prices` data:

In [101]:
stock_prices.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 245475 entries, 0 to 245474
Data columns (total 9 columns):
 #   Column     Non-Null Count   Dtype         
---  ------     --------------   -----         
 0   Date       245475 non-null  datetime64[ns]
 1   Open       245452 non-null  float64       
 2   High       245452 non-null  float64       
 3   Low        245452 non-null  float64       
 4   Close      245452 non-null  float64       
 5   Adj Close  245452 non-null  float64       
 6   Volume     245452 non-null  float64       
 7   source     245475 non-null  object        
 8   ticker     245475 non-null  object        
dtypes: datetime64[ns](1), float64(6), object(2)
memory usage: 16.9+ MB


We can subset our ticker data set using standard indexing techniques. A good reference for this type of data manipulation is Panda's [Documentation](https://pandas.pydata.org/docs/user_guide/indexing.html#indexing-and-selecting-data) and [Cookbook](https://pandas.pydata.org/docs/user_guide/cookbook.html#cookbook-selection).

From the subset data frame, select one column and convert to list.

In [102]:
select_tickers = stock_prices['ticker'].unique().tolist()
select_tickers

['IAA',
 'LN',
 'BTU',
 'WMGI',
 'CFXA',
 'ICCH',
 'CNX',
 'KELYA',
 'MLPO',
 'GFY',
 'SQBG',
 'TBLT',
 'ATMP',
 'CO',
 'PPIH',
 'AAXN',
 'BBX',
 'GFLU',
 'PMM',
 'IMTE',
 'WTREP',
 'POAI',
 'LINX',
 'SGMO',
 'FNGU',
 'KO',
 'CARE',
 'XRF',
 'FRAF',
 'NSC',
 'YJ',
 'NZF',
 'ERII',
 'GSV',
 'NCR',
 'IPHA',
 'NVRO',
 'MEET',
 'AVB',
 'IESC',
 'AGIO',
 'CSPI',
 'DXR',
 'WHG',
 'HALO',
 'ERF',
 'NAT',
 'ATRA',
 'WIA',
 'AGM-A',
 'JOF',
 'EAE',
 'NTGR',
 'CSCO',
 'PKI',
 'SBH',
 'AEL',
 'GXGX',
 'FT',
 'CUI']

# Storing Data in CSV



+ We have some data. How do we store it?
+ We can compare two options, CSV and Parqruet, by measuring their performance:

    - Time to save.
    - Space required.

In [103]:
path = '.'
with os.scandir(path) as entries:
    for entry in entries:
       print(entry)

<DirEntry '05_hyperparams.ipynb'>
<DirEntry 'images'>
<DirEntry '.env'>
<DirEntry '03a_sampling.ipynb'>
<DirEntry '07_distribution_shifts.ipynb'>
<DirEntry '02_data_engineering.ipynb'>
<DirEntry '06_explainability.ipynb'>
<DirEntry '01_setup.ipynb'>
<DirEntry '04_transforms.ipynb'>
<DirEntry '03b_pipeline.ipynb'>


In [104]:
def get_dir_size(path='.'):
    '''Returns the total size of files contained in path.'''
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            print(entry)
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total

In [105]:
import time
import shutil

In [106]:
temp = os.getenv("TEMP_DATA")
csv_dir = os.path.join(temp, "csv")
print(csv_dir)
shutil.rmtree(csv_dir, ignore_errors=True)
stock_csv = os.path.join(csv_dir, "stock_px.csv")
os.makedirs(csv_dir, exist_ok=True)

../../05_src/data/temp/csv


In [114]:

start = time.time()
stock_prices.to_csv(stock_csv, index = False)
end = time.time()

_logs.info(f'Writing data ({stock_prices.shape}) to csv took {end - start} seconds.')
_logs.info(f'CSV file size { os.path.getsize(stock_csv)*1e-6 } MB')

2025-06-11 00:29:58,517, 473192941.py, 5, INFO, Writing data ((2586, 9)) to csv took 0.04229283332824707 seconds.
2025-06-11 00:29:58,519, 473192941.py, 6, INFO, CSV file size 0.141147 MB


## Save Data to Parquet

### Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

In [120]:
import dask
print(dask.__version__)

import sys
print(sys.version)

2024.2.1
3.9.21 (main, Dec 11 2024, 16:24:11) 
[GCC 11.2.0]


In [117]:
import dask.dataframe as dd

parquet_dir = os.path.join(temp, "parquet")
shutil.rmtree(parquet_dir, ignore_errors=True)
os.makedirs(parquet_dir, exist_ok=True)

TypeError: stat: path should be string, bytes, os.PathLike or integer, not NoneType

In [None]:
px_dd = dd.from_pandas(stock_prices, npartitions = len(select_tickers))

start = time.time()
px_dd.to_parquet(parquet_dir, engine = "pyarrow")
end = time.time()

_logs.info(f'Writing dd ({stock_prices.shape}) to parquet took {end - start} seconds.')
_logs.info(f'Parquet file size { get_dir_size(parquet_dir)*1e-6 } MB')

NameError: name 'dd' is not defined

### Parquet files and Dask Dataframes

+ Parquet files are immutable: once written, they cannot be modified.
+ Dask DataFrames are a useful implementation to manipulate data stored in parquets.
+ Parquet and Dask are not the same: parquet is a file format that can be accessed by many applications and programming languages (Python, R, PowerBI, etc.), while Dask is a package in Python to work with large datasets using distributed computation.
+ **Dask is not for everything** (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 

    - Consider cases suchas small to large joins, where the small dataframe fits in memory, but the large one does not. 
    - If possible, use pandas: reduce, then use pandas.
    - Pandas performance tips apply to Dask.
    - Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
    - Avoid (or minimize) full-data shuffling: indexing is an expensive operations. 
    - Some joins are more expensive than others. 

        * Not expensive:

            - Join a Dask DataFrame with a pandas DataFrame.
            - Join a Dask DataFrame with another Dask DataFrame of a single partition.
            - Join Dask DataFrames along their indexes.

        * Expensive:

            - Join Dask DataFrames along columns that are not their index.


# How do we store prices?

+ We can store our data as a single blob. This can be difficult to maintain, especially because parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month.



In [None]:
# CLean up before start
PRICE_DATA = os.getenv("PRICE_DATA")
import shutil
if os.path.exists(PRICE_DATA):
    shutil.rmtree(PRICE_DATA)

In [None]:
stock_prices.columns

Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'source',
       'ticker'],
      dtype='object')

In [None]:
for ticker in stock_prices['ticker'].unique():
    ticker_dt = stock_prices[stock_prices['ticker'] == ticker]
    ticker_dt = ticker_dt.assign(Year = ticker_dt.Date.dt.year)
    for yr in ticker_dt['Year'].unique():
        yr_dd = dd.from_pandas(ticker_dt[ticker_dt['Year'] == yr],2)
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}")
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        yr_dd.to_parquet(yr_path, engine = "pyarrow")
    

NameError: name 'dd' is not defined

Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to get the collection of files.

In [None]:
from glob import glob

parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive = True)
dd_px = dd.read_parquet(parquet_files).set_index("ticker")

## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work like pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

In [None]:
dd_shift = dd_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
)

In [None]:
dd_rets = dd_shift.assign(
    Returns = lambda x: x['Close']/x['Close_lag_1'] - 1
)

## Lazy Exection

What does `dd_rets` contain?

In [None]:
dd_rets

+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

In [None]:
dd_rets.compute()

## Save

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

In [None]:
# CLean up before save
FEATURES_DATA = os.getenv("FEATURES_DATA")
if os.path.exists(FEATURES_DATA):
    shutil.rmtree(FEATURES_DATA)
dd_rets.to_parquet(FEATURES_DATA, overwrite = True)

# Optional: from Jupyter to Command Line

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object Oriented vs Functional Programming

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefit from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./05_src/stock_prices/data_manager.py`.

Our original design was:

![](./images/02_target_pipeline_manager.png)



In [None]:
from stock_prices.data_manager import DataManager
dm = DataManager()

Download all prices.

In [None]:
dm.process_sample_files()

Finally, add features to the data set and save to a *feature store*.

In [None]:
dm.featurize()