In [1]:
# run .env and then it will make all the settings - thisd is a nice example to follow.
%load_ext dotenv
%dotenv ../src/.env

# What are we doing?

## Objectives


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](./img/medallion-architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](./img/target_pipeline_manager.png)

# Download Data from Yahoo Finance

Yahoo Finance provides information about public stocks in different markets. The library yfinance gives us access to a fair bit of the data in Yahoo Finance. 

These steps are based on the instructions in:

+ [yfinance documentation](https://pypi.org/project/yfinance/)
+ [Tutorial in geeksforgeeks.org](https://www.geeksforgeeks.org/get-financial-data-from-yahoo-finance-with-python/)


+ If required, install: `python -m pip install yfinance`.
+ To download the price history of a stock, first use the following setup:


In [2]:
import pandas as pd
import yfinance as yf
import os
import sys

import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="yfinance")

sys.path.append("../src") # add the src path to jupyter notebook.

A few things to notice in the code chunk above:

+ Libraries are ordered from high-level to low-level libraries from the package manager (pip in this case, but could be conda, poetry, etc.)
+ The command `sys.path.append("../src/)` will add the `../src/` directory to the path in the Notebook's kernel. This way, we can use our modules as part of the notebook.
+ Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by the documentation.

Now, to download the historical price data for a stock, we could use:

In [3]:
yf.Ticker("AAPL")

yfinance.Ticker object <AAPL>

In [4]:
stock = yf.Ticker("AAPL")
px_dt = stock.history(start = "2013-12-01", end = "2024-02-01") # be careful with dating format. 

In [5]:
px_dt.columns

Index(['Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits'], dtype='object')

In [6]:
px_dt.index

DatetimeIndex(['2013-12-02 00:00:00-05:00', '2013-12-03 00:00:00-05:00',
               '2013-12-04 00:00:00-05:00', '2013-12-05 00:00:00-05:00',
               '2013-12-06 00:00:00-05:00', '2013-12-09 00:00:00-05:00',
               '2013-12-10 00:00:00-05:00', '2013-12-11 00:00:00-05:00',
               '2013-12-12 00:00:00-05:00', '2013-12-13 00:00:00-05:00',
               ...
               '2024-01-18 00:00:00-05:00', '2024-01-19 00:00:00-05:00',
               '2024-01-22 00:00:00-05:00', '2024-01-23 00:00:00-05:00',
               '2024-01-24 00:00:00-05:00', '2024-01-25 00:00:00-05:00',
               '2024-01-26 00:00:00-05:00', '2024-01-29 00:00:00-05:00',
               '2024-01-30 00:00:00-05:00', '2024-01-31 00:00:00-05:00'],
              dtype='datetime64[ns, America/New_York]', name='Date', length=2558, freq=None)

## Parametrize the download

+ Generally, we will look to separate every parameter and setting from functions.
+ If we had a few stocks, we could cycle through them. We need a place to store the list of tickers (a db or file, for example).
+ Store a csv file with a few stock tickers. The location of the file is a setting, the contents of this file are parameters.
+ Use **environment variables** to pass parameters.

In [7]:
os.getenv('TICKERS') # we are getting the right data

'../data/tickers/sp500_wiki.csv'

In [9]:
ticker_file

'../data/tickers/sp500_wiki.csv'

In [8]:
ticker_file = os.getenv("TICKERS")
pd.read_csv(ticker_file).head(20)

Unnamed: 0,ticker,Security,GICS Sector,GICS Sub-Industry,Headquarters Location,Date added,CIK,Founded
0,MMM,3M,Industrials,Industrial Conglomerates,"Saint Paul, Minnesota",20883,66740,1902
1,AOS,A. O. Smith,Industrials,Building Products,"Milwaukee, Wisconsin",42942,91142,1916
2,ABT,Abbott,Health Care,Health Care Equipment,"North Chicago, Illinois",20883,1800,1888
3,ABBV,AbbVie,Health Care,Biotechnology,"North Chicago, Illinois",41274,1551152,2013 (1888)
4,ACN,Accenture,Information Technology,IT Consulting & Other Services,"Dublin, Ireland",40730,1467373,1989
5,ADBE,Adobe Inc.,Information Technology,Application Software,"San Jose, California",35555,796343,1982
6,AMD,Advanced Micro Devices,Information Technology,Semiconductors,"Santa Clara, California",42814,2488,1969
7,AES,AES Corporation,Utilities,Independent Power Producers & Energy Traders,"Arlington, Virginia",36070,874761,1981
8,AFL,Aflac,Financials,Life & Health Insurance,"Columbus, Georgia",36308,4977,1955


In [28]:
ticker_file = os.getenv("TICKERS")
tickers = pd.read_csv(ticker_file).head(20)
type(tickers)

pandas.core.frame.DataFrame

Collecting padas data frames

+ From the [documentation](https://pandas.pydata.org/docs/user_guide/merging.html):

> [`concat()`](https://pandas.pydata.org/docs/reference/api/pandas.concat.html#pandas.concat) makes a full copy of the data, and iteratively reusing `concat()` can create unnecessary copies. Collect all DataFrame or Series objects in a list before using `concat()`.

+ We can string operation togethers using dot operations. Enclose the line in parenthesis and add linebreaks for readability.

In [32]:
px_list = list()
for k, row in tickers.iterrows():
    #print(f'The row is {row["ticker"]}\n\n')
    stock = yf.Ticker(row['ticker'])
    px = (stock
          .history(start = '2013-12-01',
                   end = '2024-02-01')
                   .reset_index()
                   .assign(ticker=row['ticker']))
    if px.shape[0] == 0:
    print(f'No data for {row["ticker"]}')  # Validate: do not fail silently.
    px_list.append(px)

px_dt = pd.concat(px_list, axis = 0) # concatenating, axis = 0 is on df on top of the other. in pandas axis = 0 is by rows
px_dt

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker
0,2013-12-02 00:00:00-05:00,93.398045,93.632126,90.362095,90.567802,6897900,0.0,0.0,MMM
1,2013-12-03 00:00:00-05:00,89.418686,90.362102,88.773195,89.801727,7864500,0.0,0.0,MMM
2,2013-12-04 00:00:00-05:00,89.340661,90.503969,89.014368,89.702423,3450700,0.0,0.0,MMM
3,2013-12-05 00:00:00-05:00,89.659845,90.546513,89.532165,89.964859,2842700,0.0,0.0,MMM
4,2013-12-06 00:00:00-05:00,90.929567,91.362261,90.688390,91.227486,2886100,0.0,0.0,MMM
...,...,...,...,...,...,...,...,...,...
2553,2024-01-25 00:00:00-05:00,84.713026,84.852144,83.947880,84.355293,1372600,0.0,0.0,AFL
2554,2024-01-26 00:00:00-05:00,84.544097,84.842203,83.918060,84.772644,1168000,0.0,0.0,AFL
2555,2024-01-29 00:00:00-05:00,84.395037,84.762709,83.540453,84.057175,1737600,0.0,0.0,AFL
2556,2024-01-30 00:00:00-05:00,84.086994,85.209873,83.679572,85.190002,1663600,0.0,0.0,AFL


In [33]:
px_dt[px_dt['ticker'] =='ABBV']

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker
0,2013-12-02 00:00:00-05:00,31.886691,32.011633,31.577622,31.820930,4321900,0.0,0.0,ABBV
1,2013-12-03 00:00:00-05:00,32.307550,33.050628,32.202335,32.859928,9480100,0.0,0.0,ABBV
2,2013-12-04 00:00:00-05:00,32.715264,33.070364,32.419346,32.662655,5309200,0.0,0.0,ABBV
3,2013-12-05 00:00:00-05:00,32.498247,32.925683,32.254938,32.721828,5449100,0.0,0.0,ABBV
4,2013-12-06 00:00:00-05:00,32.991443,33.866043,32.912532,33.767403,7285800,0.0,0.0,ABBV
...,...,...,...,...,...,...,...,...,...
2553,2024-01-25 00:00:00-05:00,164.000000,165.210007,163.199997,165.130005,4465800,0.0,0.0,ABBV
2554,2024-01-26 00:00:00-05:00,165.270004,165.860001,163.500000,164.399994,4654300,0.0,0.0,ABBV
2555,2024-01-29 00:00:00-05:00,165.850006,166.759995,163.679993,163.910004,4704100,0.0,0.0,ABBV
2556,2024-01-30 00:00:00-05:00,164.220001,164.979996,163.259995,164.919998,3819600,0.0,0.0,ABBV


In [None]:
# List to hold final results
px_list = list()

for k, row in tickers.iterrows():  # Produces an iterator that returns index and row

    stock = yf.Ticker(row['ticker'])
    print(f'Processing {row["ticker"]}')
    
    px = (stock
          .history(start = pd.to_datetime("2013-12-01"), 
                   end = pd.to_datetime("2024-02-01"))
          .reset_index()   # Reset index to get date as a column
          .assign(ticker = row['ticker']))    # Add ticker
    
    if px.shape[0] == 0:
        print(f'No data for {row["ticker"]}')  # Validate: do not fail silently.
        continue
    print(f'Downloaded {px.shape}.')
    px_list.append(px)
px_dt = pd.concat(px_list, axis = 0)
print(f'Final shape {px_dt.shape}.')

## Reliability

+ Keppelman (2017) defines *reliability* as:

    - A system should continue to work correctly. 
    - To work correctly means performing the correct function at the desired level of performance, even in the face of adversity such as hardware or software faults, and even human error. 

+ *Faults* are things that can go wrong.
+ Sytems that can cope with (certain types of) faults are called *fault-tolerant* or *resilient*.
+ A fault is different than a failure. 
    
    - A *fault* occurs when a component of the system deviates from spec.
    - A *failure*  is when the system stops providing the required service to the user.

+ In our simple example, we handle the fault that occurs when one ticker is not found and log it using *warning*.


# Storing Data in CSV



+ We have some data. How do we store it?
+ We can compare two options, CSV and Parqruet, by measuring their performance:

    - Time to save.
    - Space required.

Scan the directory, traverese the directory and total the file sizes in the directopry to measure tuime and space each files requires.

In [34]:
def get_dir_size(path='.'):
    '''Returns the total size of files contained in path.'''
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size # fix value of total and get it added to the function
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total

In [35]:
import time

In [36]:
temp = os.getenv("TEMP_DATA") # check .env for the directory etc its in there.
os.makedirs(temp, exist_ok=True) # it will create directory if it doesnt exist. if it does exist then nothing will happen
stock_path = os.path.join(temp, "stock_px.csv")

In [37]:
start = time.time()
px_dt.to_csv(stock_path, index = False)
end = time.time()

print(f'Writing to dt ({px_dt.shape})csv took {end - start} seconds.')
print(f'Csv file size { os.path.getsize(stock_path)*1e-6 } MB')

Writing to dt ((23022, 9))csv took 0.8207588195800781 seconds.
Csv file size 2.759911 MB


## Save Data to Parquet

### Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

Pd to parquet uses a simple partiotion, but with dask we can use the partitions as well. 

Notice the difference in time and space between parquet and dataframe.

In [38]:
import dask # will basically give access to parquet dataformat.
dask.config.set({'dataframe.query-planning': True})
import dask.dataframe as dd

In [41]:
px_dd = dd.from_pandas(px_dt, npartitions = len(tickers))
parquet_path = os.path.join(temp, "stock_px.parquet")

start = time.time()
px_dd.to_parquet(parquet_path, engine = "pyarrow")
end = time.time()

print(f'Writing dd ({px_dt.shape}) to parquet took {end - start} seconds.')
print(f'Parquet file size { get_dir_size(parquet_path)*1e-6 } MB')

Writing dd ((23022, 9)) to parquet took 0.10099649429321289 seconds.
Parquet file size 1.0615679999999998 MB


### Parquet files and Dask Dataframes

+ Parquet files are immutable: once written, they cannot be modified.
+ Dask DataFrames are a useful implementation to manipulate data stored in parquets.
+ Parquet and Dask are not the same: parquet is a file format that can be accessed by many applications and programming languages (Python, R, PowerBI, etc.), while Dask is a package in Python to work with large datasets using distributed computation.
+ **Dask is not for everything** (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 

    - Consider cases suchas small to larrge joins, where the small dataframe fits in memory, but the large one does not. 
    - If possible, use pandas: reduce, then use pandas.
    - Pandas performance tips apply to Dask.
    - Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
    - Avoid (or minimize) full-data shuffling: indexing is an expensive operations. 
    - Some joins are more expensive than others. 

        * Not expensive:

            - Join a Dask DataFrame with a pandas DataFrame.
            - Join a Dask DataFrame with another Dask DataFrame of a single partition.
            - Join Dask DataFrames along their indexes.

        * Expensive:

            - Join Dask DataFrames along columns that are not their index.


# How do we store prices?

+ We can store our data as a single blob. This can be difficult to maintain, especially because parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month. named spaces, create a directory for stock and a parquet file per year, and the current uyear we reqrite (parquet is immutable). That is why the partitions are important. 



In [46]:
PRICE_DATA = os.getenv("PRICE_DATA")

for ticker in px_dt.ticker.unique():
    ticker_dt = px_dt[px_dt.ticker == ticker] # subset df ticker by ticker and assign a year.
    ticker_dt = ticker_dt.assign(year = ticker_dt.Date.dt.year) # uses the dt accessor to access the data frame. 
    for yr in ticker_dt.year.unique():
        yr_dt = ticker_dt[ticker_dt.year == yr]
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}.parquet")
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        yr_dt.to_parquet(yr_path, engine = "pyarrow")
    

In [45]:
px_dt['Date'].dt.year

0       2013
1       2013
2       2013
3       2013
4       2013
        ... 
2553    2024
2554    2024
2555    2024
2556    2024
2557    2024
Name: Date, Length: 23022, dtype: int32

In [None]:
# the below lines are the same 
# px['year'] = px_dt['Date'].dt.year
# px_dt = px_dt.assign(year = px_dt.Date.dt.year)

Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to get the collection of files.

In [49]:
from glob import glob
parquet_files = glob(os.path.join(PRICE_DATA, '*/*.parquet'))
#parquet_files = glob(PRICE_DATA+"/**/*.parquet/*.parquet")
dd_px = dd.read_parquet(parquet_files).set_index("ticker")

In [50]:
dd_px # dask is a big data library, so it has lazy execution. Thats why it does not showing up. 

<dask_expr.expr.DataFrame: expr=SetIndex(frame=ReadParquet(05f1383), _other=ticker, options={})>

In [52]:
dd_px.compute()

Unnamed: 0_level_0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,year
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
ABBV,2013-12-02 00:00:00-05:00,31.886691,32.011633,31.577622,31.820930,4321900,0.0,0.0,2013
ABBV,2013-12-03 00:00:00-05:00,32.307550,33.050628,32.202335,32.859928,9480100,0.0,0.0,2013
ABBV,2013-12-04 00:00:00-05:00,32.715264,33.070364,32.419346,32.662655,5309200,0.0,0.0,2013
ABBV,2013-12-05 00:00:00-05:00,32.498247,32.925683,32.254938,32.721828,5449100,0.0,0.0,2013
ABBV,2013-12-06 00:00:00-05:00,32.991443,33.866043,32.912532,33.767403,7285800,0.0,0.0,2013
...,...,...,...,...,...,...,...,...,...
MMM,2024-01-25 00:00:00-05:00,92.247775,94.706929,92.080554,94.411835,6122900,0.0,0.0,2024
MMM,2024-01-26 00:00:00-05:00,94.647914,95.316805,94.224940,94.421669,3720200,0.0,0.0,2024
MMM,2024-01-29 00:00:00-05:00,94.431509,95.306967,93.880661,94.805298,3800000,0.0,0.0,2024
MMM,2024-01-30 00:00:00-05:00,94.598729,94.933178,93.231440,94.185593,3200500,0.0,0.0,2024


In [54]:
dd_px[dd_px.index == "ABBV"].compute() # or use ticker instead of index. if you do not set index as ticker.

Unnamed: 0_level_0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,year
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
ABBV,2013-12-02 00:00:00-05:00,31.886691,32.011633,31.577622,31.820930,4321900,0.0,0.0,2013
ABBV,2013-12-03 00:00:00-05:00,32.307550,33.050628,32.202335,32.859928,9480100,0.0,0.0,2013
ABBV,2013-12-04 00:00:00-05:00,32.715264,33.070364,32.419346,32.662655,5309200,0.0,0.0,2013
ABBV,2013-12-05 00:00:00-05:00,32.498247,32.925683,32.254938,32.721828,5449100,0.0,0.0,2013
ABBV,2013-12-06 00:00:00-05:00,32.991443,33.866043,32.912532,33.767403,7285800,0.0,0.0,2013
...,...,...,...,...,...,...,...,...,...
ABBV,2024-01-25 00:00:00-05:00,164.000000,165.210007,163.199997,165.130005,4465800,0.0,0.0,2024
ABBV,2024-01-26 00:00:00-05:00,165.270004,165.860001,163.500000,164.399994,4654300,0.0,0.0,2024
ABBV,2024-01-29 00:00:00-05:00,165.850006,166.759995,163.679993,163.910004,4704100,0.0,0.0,2024
ABBV,2024-01-30 00:00:00-05:00,164.220001,164.979996,163.259995,164.919998,3819600,0.0,0.0,2024


## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work like pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

In [55]:
import numpy as np
# This is a map reduce operation. We need returns, so we need a lagged price series, and get the ratios to get the returns, so we group by and apply.
dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
).assign(
    returns = lambda x: x['Close']/x['Close_lag_1'] - 1
).assign(
    positive_return = lambda x: (x['returns'] > 0)*1
))

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(


## Lazy Exection

What does `dd_rets` contain?

In [None]:
dd_rets

+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

In [None]:
dd_rets.compute()

## Save

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

In [None]:
features_path = os.getenv("FEATURES_DATA")
dd_rets.to_parquet(features_path, engine = "pyarrow")

# A few notes

# Jupyter? 

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object oriented programming?

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefits from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./src/`.

Our original design was:

![](./img/target_pipeline_manager.png)

Our resulting interface is:

In [None]:

from data_manager import DataManager
dm = DataManager()

In [None]:
dm.download_all()

In [None]:
dm.featurize()