# What are we doing?

## Objectives 


* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](../images/02_medallion_architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](../images/02_target_pipeline_manager.png)

In [1]:
%load_ext dotenv
%dotenv 

In [2]:
import os
os.getenv('LOG_DIR')

'../../07_logs/'

# Download Data from Yahoo Finance

Yahoo Finance provides information about public stocks in different markets. The library yfinance gives us access to a fair bit of the data in Yahoo Finance. 

These steps are based on the instructions in:

+ [yfinance documentation](https://pypi.org/project/yfinance/)
+ [Tutorial in geeksforgeeks.org](https://www.geeksforgeeks.org/get-financial-data-from-yahoo-finance-with-python/)


+ If required, install: `python -m pip install yfinance`.
+ To download the price history of a stock, first use the following setup:


In [3]:
import pandas as pd
import yfinance as yf
import os
import sys

sys.path.append(os.getenv('SRC_DIR'))

A few things to notice in the code chunk above:

+ Libraries are ordered from high-level to low-level libraries from the package manager (pip in this case, but could be conda, poetry, etc.)
+ The command `sys.path.append("../05_src/)` will add the `../05_src/` directory to the path in the Notebook's kernel. This way, we can use our modules as part of the notebook.
+ Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by the documentation.

Now, to download the historical price data for a stock, we could use:

In [4]:
stock = yf.Ticker("AAPL")
px = stock.history(start = "2013-12-01", end = "2024-02-01")

In [5]:
px

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2013-12-02 00:00:00-05:00,17.425308,17.622981,17.201090,17.213894,472544800,0.0,0.0
2013-12-03 00:00:00-05:00,17.434679,17.687004,17.415318,17.685129,450968000,0.0,0.0
2013-12-04 00:00:00-05:00,17.659515,17.774748,17.513368,17.643902,377809600,0.0,0.0
2013-12-05 00:00:00-05:00,17.882807,17.960564,17.687943,17.734472,447580000,0.0,0.0
2013-12-06 00:00:00-05:00,17.668577,17.698557,17.474339,17.488390,344352400,0.0,0.0
...,...,...,...,...,...,...,...
2024-01-25 00:00:00-05:00,194.707126,195.754371,192.602669,193.659882,54822100,0.0,0.0
2024-01-26 00:00:00-05:00,193.759620,194.248323,191.435740,191.914474,44594000,0.0,0.0
2024-01-29 00:00:00-05:00,191.505536,191.695039,189.081927,191.226273,47145600,0.0,0.0
2024-01-30 00:00:00-05:00,190.438365,191.296106,186.977480,187.545975,55859400,0.0,0.0


## Parametrize the download

+ Generally, we will look to separate every parameter and setting from functions.
+ If we had a few stocks, we could cycle through them. We need a place to store the list of tickers (a db or file, for example).
+ Store a csv file with a few stock tickers. The location of the file is a setting, the contents of this file are parameters.
+ Use **environment variables** to pass parameters.

In [7]:
ticker_file = os.getenv("TICKERS")
tickers = pd.read_csv(ticker_file).sample(50, random_state=42)
tickers

Unnamed: 0,ticker,Security,GICS Sector,GICS Sub-Industry,Headquarters Location,Date added,CIK,Founded
268,JCI,Johnson Controls,Industrials,Building Products,"Cork, Ireland",40417,833444,1885
73,BSX,Boston Scientific,Health Care,Health Care Equipment,"Marlborough, Massachusetts",34754,885725,1979
289,LIN,Linde plc,Materials,Industrial Gases,"Guildford, United Kingdom",33786,1707925,1879
155,DOW,Dow Inc.,Materials,Commodity Chemicals,"Midland, Michigan",43556,1751788,2019 (1897)
104,CVX,Chevron Corporation,Energy,Integrated Oil & Gas,"San Ramon, California",20883,93410,1879
280,KHC,Kraft Heinz,Consumer Staples,Packaged Foods & Meats,"Chicago, Illinois; Pittsburgh, Pennsylvania",42191,1637459,2015 (1869)
392,RL,Ralph Lauren Corporation,Consumer Discretionary,"Apparel, Accessories & Luxury Goods","New York City, New York",39115,1037038,1967
124,ED,Consolidated Edison,Utilities,Multi-Utilities,"New York City, New York",20883,1047862,1823
68,BX,Blackstone,Financials,Asset Management & Custody Banks,"New York City, New York",45187,1393818,1985
244,IBM,IBM,Information Technology,IT Consulting & Other Services,"Armonk, New York",20883,51143,1911


Collecting padas data frames

+ From the [documentation](https://pandas.pydata.org/docs/user_guide/merging.html):

> [`concat()`](https://pandas.pydata.org/docs/reference/api/pandas.concat.html#pandas.concat) makes a full copy of the data, and iteratively reusing `concat()` can create unnecessary copies. Collect all DataFrame or Series objects in a list before using `concat()`.

+ We can string operation togethers using dot operations. Enclose the line in parenthesis and add linebreaks for readability.

## Reliability

+ Keppelman (2017) defines *reliability* as:

    - A system should continue to work correctly. 
    - To work correctly means performing the correct function at the desired level of performance, even in the face of adversity such as hardware or software faults, and even human error. 

+ *Faults* are things that can go wrong.
+ Sytems that can cope with (certain types of) faults are called *fault-tolerant* or *resilient*.
+ A fault is different than a failure. 
    
    - A *fault* occurs when a component of the system deviates from spec.
    - A *failure*  is when the system stops providing the required service to the user.

+ In our simple example, we handle the fault that occurs when one ticker is not found and log it using *warning*.


# Storing Data in CSV



+ We have some data. How do we store it?
+ We can compare two options, CSV and Parqruet, by measuring their performance:

    - Time to save.
    - Space required.

## Save Data to Parquet

### Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

### Parquet files and Dask Dataframes

+ Parquet files are immutable: once written, they cannot be modified.
+ Dask DataFrames are a useful implementation to manipulate data stored in parquets.
+ Parquet and Dask are not the same: parquet is a file format that can be accessed by many applications and programming languages (Python, R, PowerBI, etc.), while Dask is a package in Python to work with large datasets using distributed computation.
+ **Dask is not for everything** (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 

    - Consider cases suchas small to larrge joins, where the small dataframe fits in memory, but the large one does not. 
    - If possible, use pandas: reduce, then use pandas.
    - Pandas performance tips apply to Dask.
    - Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
    - Avoid (or minimize) full-data shuffling: indexing is an expensive operations. 
    - Some joins are more expensive than others. 

        * Not expensive:

            - Join a Dask DataFrame with a pandas DataFrame.
            - Join a Dask DataFrame with another Dask DataFrame of a single partition.
            - Join Dask DataFrames along their indexes.

        * Expensive:

            - Join Dask DataFrames along columns that are not their index.


# How do we store prices?

+ We can store our data as a single blob. This can be difficult to maintain, especially because parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month.



Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to get the collection of files.

## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work like pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

## Lazy Exection

What does `dd_rets` contain?

+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

## Save

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

# A few notes

# Jupyter? 

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object oriented programming?

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefits from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./05_src/`.

Our original design was:

![](../images/02_target_pipeline_manager.png)

Our resulting interface is: