In [1]:
%load_ext dotenv
%dotenv ../src/.env

# What are we doing?

## Objectives 


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](./img/medallion-architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](./img/target_pipeline_manager.png)

# Download Data from Yahoo Finance

Yahoo Finance provides information about public stocks in different markets. The library yfinance gives us access to a fair bit of the data in Yahoo Finance. 

These steps are based on the instructions in:

+ [yfinance documentation](https://pypi.org/project/yfinance/)
+ [Tutorial in geeksforgeeks.org](https://www.geeksforgeeks.org/get-financial-data-from-yahoo-finance-with-python/)


+ If required, install: `python -m pip install yfinance`.
+ To download the price history of a stock, use:


In [2]:
import sys
sys.path.append("../src")
import yfinance as yf
from logger import get_logger

_logs = get_logger(__name__)


import pandas as pd
import os

import warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="yfinance")




stock = yf.Ticker("AAPL")
px = stock.history(start = "2013-12-01", end = "2024-02-01")

+ If we had a few stocks, we could cycle through them. 
+ Store a csv file with a few stock tickers.

In [3]:

ticker_file = os.getenv("TICKERS")
tickers = pd.read_csv(ticker_file).head(20)

In [4]:
px_list = list()
for k, row in tickers.iterrows():
    stock = yf.Ticker(row['ticker'])
    _logs.info(f'Processing {row["ticker"]}')
    px = (stock
          .history(start = pd.to_datetime("2013-12-01"), 
                   end = pd.to_datetime("2024-02-01"))
          .reset_index()
          .assign(ticker = row['ticker']))
    if px.shape[0] == 0:
        _logs.warning(f'No data for {row["ticker"]}')
        continue
    _logs.info(f'Downloaded {px.shape}.')
    px_list.append(px)
px_dt = pd.concat(px_list, axis = 0)
_logs.info(f'Final shape {px_dt.shape}.')

2024-02-11 21:01:35,077, 1062139504.py, 4, INFO, Processing MSFT
2024-02-11 21:01:35,189, 1062139504.py, 13, INFO, Downloaded (2558, 9).
2024-02-11 21:01:35,190, 1062139504.py, 4, INFO, Processing AAPL
2024-02-11 21:01:35,214, 1062139504.py, 13, INFO, Downloaded (2558, 9).
2024-02-11 21:01:35,216, 1062139504.py, 4, INFO, Processing NVDA
2024-02-11 21:01:35,315, 1062139504.py, 13, INFO, Downloaded (2558, 9).
2024-02-11 21:01:35,315, 1062139504.py, 4, INFO, Processing AMZN
2024-02-11 21:01:35,454, 1062139504.py, 13, INFO, Downloaded (2558, 9).
2024-02-11 21:01:35,456, 1062139504.py, 4, INFO, Processing META
2024-02-11 21:01:35,555, 1062139504.py, 13, INFO, Downloaded (2558, 9).
2024-02-11 21:01:35,556, 1062139504.py, 4, INFO, Processing GOOGL
2024-02-11 21:01:35,767, 1062139504.py, 13, INFO, Downloaded (2558, 9).
2024-02-11 21:01:35,768, 1062139504.py, 4, INFO, Processing GOOG
2024-02-11 21:01:35,884, 1062139504.py, 13, INFO, Downloaded (2558, 9).
2024-02-11 21:01:35,886, 1062139504.py, 

+ We have some data. How do we store it?
+ We can compare two options: CSV and Parqruet.

# Storing Data in CSV



In [5]:
temp = os.getenv("TEMP_DATA")
os.makedirs(temp, exist_ok=True)
stock_path = os.path.join(temp, "stock_px.csv")

In [6]:
import time

In [7]:
start = time.time()
px_dt.to_csv(stock_path, index = False)
end = time.time()
_logs.info(f'Writing to dt ({px_dt.shape})csv took {end - start} seconds.')
_logs.info(f'Csv file size { os.path.getsize(stock_path)*1e-6 } MB')

2024-02-11 21:01:39,434, 3309402642.py, 4, INFO, Writing to dt ((48602, 9))csv took 0.35765790939331055 seconds.
2024-02-11 21:01:39,434, 3309402642.py, 5, INFO, Csv file size 5.8630249999999995 MB


## Save Data to Parquet

## Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than arrow. It provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

In [8]:
import dask.dataframe as dd 

In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

  import dask.dataframe as dd


In [9]:
def get_dir_size(path='.'):
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total


px_dd = dd.from_pandas(px_dt, npartitions = len(tickers))
parquet_path = os.path.join(temp, "stock_px.parquet")
start = time.time()
px_dd.to_parquet(parquet_path, engine = "pyarrow")
end = time.time()
_logs.info(f'Writing to dd ({px_dt.shape}) parquet took {end - start} seconds.')
_logs.info(f'Parquet file size { get_dir_size(parquet_path)*1e-6 } MB')

2024-02-11 21:01:39,850, 399157755.py, 17, INFO, Writing to dd ((48602, 9)) parquet took 0.1556870937347412 seconds.
2024-02-11 21:01:39,851, 399157755.py, 18, INFO, Parquet file size 2.242184 MB


# How do we store prices?

+ We can store a single blob of data. This can be difficult to maintain, especially as parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month.



In [10]:
PRICE_DATA = os.getenv("PRICE_DATA")
for ticker in px_dt.ticker.unique():
    ticker_dt = px_dt[px_dt.ticker == ticker]
    ticker_dt = ticker_dt.assign(year = ticker_dt.Date.dt.year)
    for yr in ticker_dt.year.unique():
        yr_dt = ticker_dt[ticker_dt.year == yr]
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}.parquet")
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        yr_dt.to_parquet(yr_path, engine = "pyarrow")
    

Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

In [40]:
from glob import glob

parquet_files = glob(PRICE_DATA+"/**/*.parquet")
dd_px = dd.read_parquet(parquet_files).set_index("ticker")

In [41]:
import numpy as np
dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
).assign(
    log_returns = lambda x: np.log(x['Close']/x['Close_lag_1']), 
    returns = lambda x: x['Close']/x['Close_lag_1'] - 1
))

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(


In [42]:
dd_rets.compute()

Unnamed: 0_level_0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,year,Close_lag_1,log_returns,returns
ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
AAPL,2013-12-02 00:00:00-05:00,17.448944,17.646886,17.224423,17.237244,472544800,0.0,0.0,2013,,,
AAPL,2013-12-03 00:00:00-05:00,17.458325,17.710991,17.438937,17.709114,450968000,0.0,0.0,2013,17.237244,0.027007,0.027375
AAPL,2013-12-04 00:00:00-05:00,17.683472,17.798861,17.537126,17.667837,377809600,0.0,0.0,2013,17.709114,-0.002334,-0.002331
AAPL,2013-12-05 00:00:00-05:00,17.907059,17.984923,17.711932,17.758524,447580000,0.0,0.0,2013,17.667837,0.005120,0.005133
AAPL,2013-12-06 00:00:00-05:00,17.692533,17.722554,17.498032,17.512102,344352400,0.0,0.0,2013,17.758524,-0.013973,-0.013876
...,...,...,...,...,...,...,...,...,...,...,...,...
XOM,2024-01-25 00:00:00-05:00,100.309998,102.180000,99.620003,102.129997,22089500,0.0,0.0,2024,99.599998,0.025084,0.025402
XOM,2024-01-26 00:00:00-05:00,101.970001,103.080002,101.190002,103.000000,20817200,0.0,0.0,2024,102.129997,0.008483,0.008519
XOM,2024-01-29 00:00:00-05:00,102.980003,103.199997,101.860001,103.129997,18317500,0.0,0.0,2024,103.000000,0.001261,0.001262
XOM,2024-01-30 00:00:00-05:00,102.410004,104.879997,102.099998,104.849998,19610900,0.0,0.0,2024,103.129997,0.016540,0.016678


## Enrich data

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

In [18]:
px_dd.assign(
    Close_lag_1 = px_dd.groupby("ticker")["Close"].shift(1)
)

  Before: .shift(1)
  After:  .shift(1, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .shift(1, meta=('x', 'f8'))            for series result
  Close_lag_1 = px_dd.groupby("ticker")["Close"].shift(1)


ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.