In [1]:
%load_ext dotenv
%dotenv ../src/.env

import os
os.getenv('PRICE_DATA')

'../data/prices/'

# What are we doing?

## Objectives 

* Build a data pipeline that downloads price data from the internet, stores it locally, transforms it into return data, and stores the feature set.
    - Getting the data.
    - Schemas and index in dask.

* Explore the parquet format.
    - Reading and writing parquet files.
    - Read datasets that are stored in distributed files.
    - Discuss dask vs pandas as a small example of big vs small data.
    
* Discuss the use of environment variables for settings.
* Discuss how to use Jupyter notebooks and source code concurrently. 
* Logging and using a standard logger.

## About the Data

+ We will download the prices for a list of stocks.
+ The source is Yahoo Finance and we will use the API provided by the library yfinance.


## Medallion Architecture

+ The architecture that we are thinking about is called Medallion by [DataBricks](https://www.databricks.com/glossary/medallion-architecture). It is an ELT type of thinking, although our data is well-structured.

![Medallion Architecture (DataBicks)](./img/medallion-architecture.png)

+ In our case, we would like to optimize the number of times that we download data from the internet. 
+ Ultimately, we will build a pipeline manager class that will help us control the process of obtaining and transforming our data.

![](./img/target_pipeline_manager.png)

# Download Data from Yahoo Finance

Yahoo Finance provides information about public stocks in different markets. The library yfinance gives us access to a fair bit of the data in Yahoo Finance. 

These steps are based on the instructions in:

+ [yfinance documentation](https://pypi.org/project/yfinance/)
+ [Tutorial in geeksforgeeks.org](https://www.geeksforgeeks.org/get-financial-data-from-yahoo-finance-with-python/)


+ If required, install: `python -m pip install yfinance`.
+ To download the price history of a stock, first use the following setup:


In [2]:
# loading libraries
import pandas as pd
import yfinance as yf

# os library to get access to environment files
# already called  above
# import os
import sys

import warnings
# to silence warnings produced by yfinance, not necessary
warnings.filterwarnings("ignore", category=FutureWarning, module="yfinance")

# system path is the path the computer will follow to execute commands
# e.g. cmd "import os" the path already contains a directory where the library os is stored /
# I want that path to also contain all of the code that I will be running and I want it to make it /
# available to Jupyter Notebooks and the path is in the /src directory
# so the path.append command will add the /src directory to Jupyter Notebook's kernel
sys.path.append("../src")

A few things to notice in the code chunk above:

+ Libraries are ordered from high-level to low-level libraries from the package manager (pip in this case, but could be conda, poetry, etc.)
+ The command `sys.path.append("../src/)` will add the `../src/` directory to the path in the Notebook's kernel. This way, we can use our modules as part of the notebook.
+ Local modules are imported at the end. 
+ The function `get_logger()` is called with `__name__` as recommended by the documentation.

Now, to download the historical price data for a stock, we could use:

In [3]:
# to look at what a stock looks like

# Ticker class will generate a ticker object;
# we have instantiated a class and now we have a specific version of that class called AAPL
# 'class' is an abstract concept of something that will have certain behaviour
# a class encapsulates attributes, variables and functions
# 'object' is the specific concrete implementation of that class
stock = yf.Ticker("AAPL")

# now stock has some behaviour, I can get from it some stock history
# only one way of unequivocally stating dates and that is YYYY-MM-DD
px_dt = stock.history(start = "2013-12-01", end = "2024-02-01")
# what happened is I have created an object
# the variable "AAPL" that I have encapsulated in my object, in my concrete version of the class,
# is the ticker name, the start date and the end date  

px_dt
# px_dt.columns
# px_dt.shape
# px_dt.head
# px_dt.index

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2013-12-02 00:00:00-05:00,17.448939,17.646880,17.224417,17.237238,472544800,0.0,0.0
2013-12-03 00:00:00-05:00,17.458327,17.710993,17.438939,17.709116,450968000,0.0,0.0
2013-12-04 00:00:00-05:00,17.683478,17.798866,17.537132,17.667843,377809600,0.0,0.0
2013-12-05 00:00:00-05:00,17.907056,17.984919,17.711928,17.758520,447580000,0.0,0.0
2013-12-06 00:00:00-05:00,17.692545,17.722565,17.498044,17.512114,344352400,0.0,0.0
...,...,...,...,...,...,...,...
2024-01-25 00:00:00-05:00,194.971211,196.019876,192.863900,193.922546,54822100,0.0,0.0
2024-01-26 00:00:00-05:00,194.022423,194.511788,191.695390,192.174774,44594000,0.0,0.0
2024-01-29 00:00:00-05:00,191.765299,191.955059,189.338403,191.485657,47145600,0.0,0.0
2024-01-30 00:00:00-05:00,190.696667,191.555572,187.231088,187.800354,55859400,0.0,0.0


## Parametrize the download

+ in the case with "AAPL", we worked with a single stock but we want work with big datasets so we need to parametrize with 500 stocks at a time
+ Generally, we will look to separate every parameter and setting from functions.
+ If we had a few stocks, we could cycle through them. We need a place to store the list of tickers (a db or file, for example).
+ Store a csv file with a few stock tickers. The location of the file is a setting, the contents of this file are parameters.
+ Use **environment variables** to pass parameters.

In [4]:
# notice using capital letters for environment variables
# using capital letters for TICKERS is a convention
# I can choose not to follow this convention but will make life difficult
ticker_file = os.getenv("TICKERS")

# reading the first 20 rows of the S&P500_wiki CSV into Pandas
tickers = pd.read_csv(ticker_file).head(20)
tickers

Unnamed: 0,ticker,Security,GICS Sector,GICS Sub-Industry,Headquarters Location,Date added,CIK,Founded
0,MMM,3M,Industrials,Industrial Conglomerates,"Saint Paul, Minnesota",20883,66740,1902
1,AOS,A. O. Smith,Industrials,Building Products,"Milwaukee, Wisconsin",42942,91142,1916
2,ABT,Abbott,Health Care,Health Care Equipment,"North Chicago, Illinois",20883,1800,1888
3,ABBV,AbbVie,Health Care,Biotechnology,"North Chicago, Illinois",41274,1551152,2013 (1888)
4,ACN,Accenture,Information Technology,IT Consulting & Other Services,"Dublin, Ireland",40730,1467373,1989
5,ADBE,Adobe Inc.,Information Technology,Application Software,"San Jose, California",35555,796343,1982
6,AMD,Advanced Micro Devices,Information Technology,Semiconductors,"Santa Clara, California",42814,2488,1969
7,AES,AES Corporation,Utilities,Independent Power Producers & Energy Traders,"Arlington, Virginia",36070,874761,1981
8,AFL,Aflac,Financials,Life & Health Insurance,"Columbus, Georgia",36308,4977,1955
9,A,Agilent Technologies,Health Care,Life Sciences Tools & Services,"Santa Clara, California",36682,1090872,1999


Collecting padas data frames

+ From the [documentation](https://pandas.pydata.org/docs/user_guide/merging.html):

> [`concat()`](https://pandas.pydata.org/docs/reference/api/pandas.concat.html#pandas.concat) makes a full copy of the data, and iteratively reusing `concat()` can create unnecessary copies. Collect all DataFrame or Series objects in a list before using `concat()`.

+ We can string operation togethers using dot operations. Enclose the line in parenthesis and add linebreaks for readability.

In [None]:
# DON'T RUN
for k, row in tickers.iterrows():
    print(f'This is the {k}th row.')
    print(f'The row is {row}')

In [5]:
# List to hold final results
px_list = list()

for k, row in tickers.iterrows():  # Produces an iterator that returns index and row

    print(f"The ticker is {row['ticker']}\n\n")

    stock = yf.Ticker(row['ticker'])
    print(f'Processing {row["ticker"]}')
    
    # round parenthises are to put one command across multiple lines
    px = (stock
          # first operation
          .history(start = pd.to_datetime("2013-12-01"), 
                   end = pd.to_datetime("2024-02-01"))

          # had datetime in indices, reset index to get date as a column, date is now out of the index
          .reset_index()

          # assign 'ticker' name to dataframe, we now have a new variable called 'ticker' at the very end
          # we need this because we are goind to download 500 tickers
          # and we need to add this labelling mechanism
          .assign(ticker = row['ticker']))    # Add ticker
    
    # If we have a known condition, say price data contains a dataframe that does not have rows,
    # we ask for a certain stock and that stock returns no prices, we need our code to be reliable.
    # Here we create a test for shape[0], if the first element of shape which is the number of rows is equal to zero
    # then we need to alert the user, I don't want it to fail silently.
    # It will go to the next iteration of "for", it won't wait to go to the last line, skip to the next iteration
    # and continue to the top of the for loop, it won't execute "px_list.append(px)"
    if px.shape[0] == 0:
      print(f'No data for {row["ticker"]}')  # Validate: do not fail silently.
      continue
    # taking list created at top and appending my dataframe
    px_list.append(px)
    print(px.head(5))    

The ticker is MMM


Processing MMM
                       Date       Open       High        Low      Close  \
0 2013-12-02 00:00:00-05:00  93.398053  93.632134  90.362102  90.567810   
1 2013-12-03 00:00:00-05:00  89.418679  90.362094  88.773187  89.801720   
2 2013-12-04 00:00:00-05:00  89.340638  90.503946  89.014345  89.702400   
3 2013-12-05 00:00:00-05:00  89.659853  90.546521  89.532172  89.964867   
4 2013-12-06 00:00:00-05:00  90.929582  91.362276  90.688405  91.227501   

    Volume  Dividends  Stock Splits ticker  
0  6897900        0.0           0.0    MMM  
1  7864500        0.0           0.0    MMM  
2  3450700        0.0           0.0    MMM  
3  2842700        0.0           0.0    MMM  
4  2886100        0.0           0.0    MMM  
The ticker is AOS


Processing AOS
                       Date       Open       High        Low      Close  \
0 2013-12-02 00:00:00-05:00  23.161834  23.264453  22.918116  23.020735   
1 2013-12-03 00:00:00-05:00  22.892464  23.042115  22.61026

In [6]:
# concatonate one on top of the other
# concatenate is just another way of saying "to combine" or "to join together"
# like a layer cake, going to build layer by layer
# every layer with have the same schema (Date, Open, High, Low, Close, ...)
px_dt = pd.concat(px_list, axis = 0)
# axis = 0 vs axis = 1; there are two ways of putting together your dataframes,
# you can put them one on top of the other or you can put them side by side
# axis = 0 puts one on top of the other, in Pandas, axis = 0 means by rows

# create a dataframe that contains all of our price information
# that we can uniquely identify all of the different tickers
px_dt

# SUMMARY
# 1. we build the solution for a single stock, we understood how to get the data for a single stock
# 2. we got a list of stocks that we want to run the solution for
# 3. we iterate the individualized solution over the list of stocks
# 4. we collected all the different datasets into a single large dataset
# This pattern of using a blank list and then using concatonate at the end
# is the pattern for joining multiple datasets at the same time.
# When you get a ton of CSV, you need to put them together, create a list, append to the list, 
# and finally concatonate with the append function

print(f'Downloaded, final shape {px.shape}.')


Downloaded, final shape (2558, 9).


## Reliability

+ Keppelman (2017) defines *reliability* as:

    - A system should continue to work correctly. 
    - To work correctly means performing the correct function at the desired level of performance, even in the face of adversity such as hardware or software faults, and even human error. 

+ *Faults* are things that can go wrong.
+ Sytems that can cope with (certain types of) faults are called *fault-tolerant* or *resilient*.
+ A fault is different than a failure. 
    
    - A *fault* occurs when a component of the system deviates from spec.
    - A *failure*  is when the system stops providing the required service to the user.

+ In our simple example, we handle the fault that occurs when one ticker is not found and log it using *warning*.


# Storing Data in CSV



+ We have some data. How do we store it?
+ We can compare two options, CSV and Parqruet, by measuring their performance:

    - Time to save.
    - Space required.

In [7]:
# fuction that returns the sizes of all the elements in a certain directory, in the repo

def get_dir_size(path='.'):
    '''Returns the total size of files contained in path.'''
    total = 0

    # Using scandir, get every single file you find in a path.
    # Using a recursive pattern to traverse all the directory and all of the files
    # to get thier sizes and add them up.
    # We are trying to measue how much time and space each one of my file types will cost
    with os.scandir(path) as it:
        # for every entry in the path... 
        for entry in it:
            # if it is a file, get it's size and add it to total
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total

In [8]:
import time

In [9]:
# To measure time
# if my directory does not exist, I will get an error
# trying to write to a none exisiting directory
# exist_ok, if it exists nothing will happen, else it will create it

# TEMP is an environment varible used by Windows, changed the name to TEMP_Data
temp = os.getenv("TEMP_DATA")

# IMPORTANT
# If the directory does not exist, I will get an error when I try to write into a none existing directory.
# 'exist_ok=True' - if True, nothing will happend, if False, it will create the directory.
os.makedirs(temp, exist_ok=True)

# creating a function that will join two paths, 'temp' and the price we got from file
stock_path = os.path.join(temp, "stock_px.csv")

In [10]:
# I want to first write my file into CSV then into Parquet.

# I want to measure time. 
# Time snapshot.
start = time.time()

# 
px_dt.to_csv(stock_path, index = False)

# Time snapshot.
end = time.time()

# 'end - start' will output the time the operation took
print(f'Writing to dt ({px_dt.shape})csv took {end - start} seconds.')
# Get data size of file
print(f'Csv file size { os.path.getsize(stock_path)*1e-6 } MB')

Writing to dt ((49391, 9))csv took 1.4060029983520508 seconds.
Csv file size 5.904595 MB


## Save Data to Parquet

### Dask 

We can work with with large data sets and parquet files. In fact, recent versions of pandas support pyarrow data types and future versions will require a pyarrow backend. The pyarrow library is an interface between Python and the Appache Arrow project. The [parquet data format](https://parquet.apache.org/) and [Arrow](https://arrow.apache.org/docs/python/parquet.html) are projects of the Apache Foundation.

However, Dask is much more than an interface to Arrow: Dask provides parallel and distributed computing on pandas-like dataframes. It is also relatively easy to use, bridging a gap between pandas and Spark. 

In [11]:
import dask
dask.config.set({'dataframe.query-planning': True})
import dask.dataframe as dd

In [12]:
# Dask is a library that extends pandas. It gives us parallel computaiton and access to the Parquet format.
# If you have a problem that is too big to handle in memory for Panadas, first try Dask and
# if much bigger, use Spark.
# Dask is an intermediate solution before you break out big data tools and have clustering.
# Dask will allow you to work on bigger dataframes because it parallelizes a fair bit of it's computations.
# pd to parquet will force you to use a single partition, Dask will allow you to partition it in mulple ways.
# Parquet are immutable, we need something to we can change
# create a name space, 
# create a directory perstock and a file per year

px_dd = dd.from_pandas(px_dt, npartitions = len(tickers))

# Run Parquet file format, first run 'path.join' to joining 'temp' with a directory name 'stock_px.parquet' 
parquet_path = os.path.join(temp, "stock_px.parquet")

# start a counter, stop the counter then make space
start = time.time()
px_dd.to_parquet(parquet_path, engine = "pyarrow")
end = time.time()

# difference is less than half, Parquet twice as fast to save as CSV
# less than half the time, less than half the space
print(f'Writing dd ({px_dt.shape}) to parquet took {end - start} seconds.')
print(f'Parquet file size { get_dir_size(parquet_path)*1e-6 } MB')

Writing dd ((49391, 9)) to parquet took 0.1735854148864746 seconds.
Parquet file size 2.241513 MB


### Parquet files and Dask Dataframes

+ Parquet files are immutable: once written, they cannot be modified.
+ Dask DataFrames are a useful implementation to manipulate data stored in parquets.
+ Parquet and Dask are not the same: parquet is a file format that can be accessed by many applications and programming languages (Python, R, PowerBI, etc.), while Dask is a package in Python to work with large datasets using distributed computation.
+ **Dask is not for everything** (see [Dask DataFrames Best Practices](https://docs.dask.org/en/stable/dataframe-best-practices.html)). 

    - Consider cases suchas small to larrge joins, where the small dataframe fits in memory, but the large one does not. 
    - If possible, use pandas: reduce, then use pandas.
    - Pandas performance tips apply to Dask.
    - Use the index: it is beneficial to have a well-defined index in Dask DataFrames, as it may speed up searching (filtering) the data. A one-dimensional index is allowed.
    - Avoid (or minimize) full-data shuffling: indexing is an expensive operations. 
    - Some joins are more expensive than others. 

        * Not expensive:

            - Join a Dask DataFrame with a pandas DataFrame.
            - Join a Dask DataFrame with another Dask DataFrame of a single partition.
            - Join Dask DataFrames along their indexes.

        * Expensive:

            - Join Dask DataFrames along columns that are not their index.


# How do we store prices?

+ We can store our data as a single blob. This can be difficult to maintain, especially because parquet files are immutable.
+ Strategy: organize data files by ticker and date. Update only latest month.



In [13]:
# Parquet files are binary, the way to open them is by using a Python Library and importing them.
# Python is not the only software, you can point your PowerBI instance to the directory and it will
# read Parquet and display all your price information with the ticker and you can do charts.
# You can point R to the directory and it will display your information. 

# With this function, I want to store prices in such a way so I can maintain them over the long term. 
# Remember: Parquet are immutable, once we write them, we can not append things to them.
# We need a strategy, that strategy relates to "Namespaces".
# I am going to use names of my files and folder structure to retain this information.  
# I am going to create a directory per stock and a Parquet file per year, 
# in this way I am only going to need to download the older years once, and 
# I can then rewrite the recent year every single time that I want to update my task.
# When I switch over to a new year, then that year will become historical.

PRICE_DATA = os.getenv("PRICE_DATA")

# loading ticker data, first iterating by date
for ticker in px_dt.ticker.unique():
    # take ticker by ticker and I'm going to subset my dataframe
    ticker_dt = px_dt[px_dt.ticker == ticker]
    # assigning a year
    ticker_dt = ticker_dt.assign(year = ticker_dt.Date.dt.year)
    # loading ticker data, second iterating by year
    for yr in ticker_dt.year.unique():
        yr_dt = ticker_dt[ticker_dt.year == yr]
        yr_path = os.path.join(PRICE_DATA, ticker, f"{ticker}_{yr}.parquet")
        
        # saving file data same as before
        # will create the directory if it doesn't already exist
        # These paths are in compliance with Linux, Windows and MacOS.
        # Using 'os.path.dirname', Python will take care of me irrespective of the OS I'm using.
        os.makedirs(os.path.dirname(yr_path), exist_ok=True)
        # writing directly to Parquet
        yr_dt.to_parquet(yr_path, engine = "pyarrow")

# SUMMARY
# What I end up with is a 'prices' directory and for every stock I get a different Parquet file.
# Given order to chaos. Started with a large blob of data, I cut it up, putting it into the right places.
# If this data is in an assured log storage, I can retain it.
# I am making my code Generalizable, Modularizable, and more Fault Tolerant.
    

In [14]:
# Pandas can work both ways
# px_dt.Date or px_dt['Date']

# a single column is called a 'series' in Pandas, 'Date' is an accessor
px_dt['Date']

# I can do accessors as in px_dt['Date'].dt.year, px_dt['Date'].dt.month, or px_dt['Date'].dt.day
# preferred way, both lines are same as above
# they add a 'year' column to the ind of dataframe
px_dt['year'] = px_dt['Date'].dt.year
px_dt = px_dt.assign(year = px_dt.Date.dt.year)

In [29]:
px_dt

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker,year
0,2013-12-02 00:00:00-05:00,93.398053,93.632134,90.362102,90.567810,6897900,0.0,0.0,MMM,2013
1,2013-12-03 00:00:00-05:00,89.418679,90.362094,88.773187,89.801720,7864500,0.0,0.0,MMM,2013
2,2013-12-04 00:00:00-05:00,89.340638,90.503946,89.014345,89.702400,3450700,0.0,0.0,MMM,2013
3,2013-12-05 00:00:00-05:00,89.659853,90.546521,89.532172,89.964867,2842700,0.0,0.0,MMM,2013
4,2013-12-06 00:00:00-05:00,90.929582,91.362276,90.688405,91.227501,2886100,0.0,0.0,MMM,2013
...,...,...,...,...,...,...,...,...,...,...
2553,2024-01-25 00:00:00-05:00,150.070007,153.050003,149.539993,151.869995,29149100,0.0,0.0,GOOGL,2024
2554,2024-01-26 00:00:00-05:00,151.100006,152.539993,151.009995,152.190002,26115500,0.0,0.0,GOOGL,2024
2555,2024-01-29 00:00:00-05:00,152.059998,153.779999,151.429993,153.509995,27784300,0.0,0.0,GOOGL,2024
2556,2024-01-30 00:00:00-05:00,152.800003,153.619995,151.190002,151.460007,36331800,0.0,0.0,GOOGL,2024


Why would we want to store data this way?

+ Easier to maintain. We do not update old data, only recent data.
+ We can also access all files as follows.

# Load, Transform and Save 

## Load

+ Parquet files can be read individually or as a collection.
+ `dd.read_parquet()` can take a list (collection) of files as input.
+ Use `glob` to get the collection of files.

In [15]:
# This is demonstrating how I am going to read the data I have featurizaed.
from glob import glob

# This is giving me a path with wildcards. Looking for all files with this structure.
# From the /data/prices directory, it will grab all files ending in .parquet.
# os.path.join(PRICE_DATA, "*/*.parquet")
parquet_files = glob(os.path.join(PRICE_DATA, "*/*.parquet"))

# parquet_files = glob(PRICE_DATA+"/**/*.parquet/*.parquet")
# 'dd' stands for Dask Dataframe
# Upon execution, Dask does nothing. It is a big data library. Big data libraries have lazy execution.
# Meaning no command will execute until ultimitly necessary. Reason being, we want to verify our code
# will run without errors and leave as much of the execution until the very end because once run,
# we might need to wait a few hours until it completes. This way of processing is called 'Lazy Execution'
dd_px = dd.read_parquet(parquet_files)
# dd_px = dd.read_parquet(parquet_files).set_index("ticker")

In [16]:
# To trigger the execution of the code I must run 'compute'.
# This will transform my Dask dataframe and compartmentalize it into a Pandas dataframe.
# Doing it for a single ticker item:  dd_px[dd_px.ticker == "ABBV"].compute()  
dd_px.compute()

# SUMMARY
# By passing the 'compute' command the parquet files and everything has been compiled into
# a single dataframe. Nothing needs to be done by hand, the dataframe has already joined.
# It has already been converted and put into Pandas.
# I have gone from a list of 20 files into a single dataframe in a single prompt.
# The power of Parquet, it allows me to break apart a very large datasets into individual components
# that can be managed individually and then when I need to read them I simply point to all of them
# at the same time and bring them back.

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker,year
0,2013-12-02 00:00:00-05:00,35.052676,35.183785,34.816674,34.882229,2039962,0.0,0.0,A,2013
1,2013-12-03 00:00:00-05:00,34.692114,34.856006,34.456113,34.698669,3462706,0.0,0.0,A,2013
2,2013-12-04 00:00:00-05:00,34.639674,35.288678,34.561009,35.124786,3377288,0.0,0.0,A,2013
3,2013-12-05 00:00:00-05:00,34.974012,35.269013,34.823234,35.072346,2530939,0.0,0.0,A,2013
4,2013-12-06 00:00:00-05:00,35.242803,36.009803,35.242803,35.944248,4268513,0.0,0.0,A,2013
...,...,...,...,...,...,...,...,...,...,...
2553,2024-01-25 00:00:00-05:00,92.247775,94.706929,92.080554,94.411835,6122900,0.0,0.0,MMM,2024
2554,2024-01-26 00:00:00-05:00,94.647914,95.316805,94.224940,94.421669,3720200,0.0,0.0,MMM,2024
2555,2024-01-29 00:00:00-05:00,94.431509,95.306967,93.880661,94.805298,3800000,0.0,0.0,MMM,2024
2556,2024-01-30 00:00:00-05:00,94.598729,94.933178,93.231440,94.185593,3200500,0.0,0.0,MMM,2024


## Transform

+ This transformation step will create a *Features* data set. In our case, features will be stock returns (we obtained prices).
+ Dask dataframes work like pandas dataframes: in particular, we can perform groupby and apply operations.
+ Notice the use of [an anonymous (lambda) function](https://realpython.com/python-lambda/) in the apply statement.

In [21]:
# This is a map reduce operation.
# Calculate returns: take price series, group it by ticker, take a lagged price series, then calculate ratio.
# The closing price of today vs the previous closing price. 

import numpy as np

# concentrate on the grouby, lambda, most difficult part of the assignment
dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
).assign(
    returns = lambda x: x['Close']/x['Close_lag_1'] - 1
).assign(
    positive_return = lambda x: (x['returns'] > 0)*1
))

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_rets = (dd_px.groupby('ticker', group_keys=False).apply(


In [22]:
type(dd_rets)

dask_expr._collection.DataFrame

In [24]:
dd_rets.head(5)



Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker,year,Close_lag_1,returns,positive_return


## Lazy Exection

What does `dd_rets` contain?

In [18]:
dd_rets

<dask_expr.expr.DataFrame: expr=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=Assign(frame=GroupByApply(frame=ReadParquet(9c4803f), observed=False, group_keys=False, func=<function <lambda> at 0x000001FD3423D940>, meta=_NoDefault.no_default, args=(), kwargs={})))))))>

+ Dask is a lazy execution framework: commands will not execute until they are required. 
+ To trigger an execution in dask use `.compute()`.

In [19]:
dd_rets.compute()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits,ticker,year,Close_lag_1,returns,positive_return
2537,2024-01-02 00:00:00-05:00,153.434029,158.624444,153.305270,158.307480,8059900,0.0,0.0,ABBV,2024,,,0
2538,2024-01-03 00:00:00-05:00,159.070181,160.219206,158.327279,158.941422,5301400,0.0,0.0,ABBV,2024,158.307480,0.004004,1
2539,2024-01-04 00:00:00-05:00,158.792836,160.754093,157.921159,159.931961,8332200,0.0,0.0,ABBV,2024,158.941422,0.006232,1
2540,2024-01-05 00:00:00-05:00,159.803183,161.487091,159.089996,160.605515,5607600,0.0,0.0,ABBV,2024,159.931961,0.004212,1
2541,2024-01-08 00:00:00-05:00,160.645139,161.229567,158.267858,159.902237,6001600,0.0,0.0,ABBV,2024,160.605515,-0.004379,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
772,2016-12-23 00:00:00-05:00,29.584957,29.601774,29.408378,29.492464,2147000,0.0,0.0,AFL,2016,29.589172,-0.003268,0
773,2016-12-27 00:00:00-05:00,29.496680,29.727909,29.442023,29.597580,1766600,0.0,0.0,AFL,2016,29.492464,0.003564,1
774,2016-12-28 00:00:00-05:00,29.521911,29.551340,29.303290,29.345335,3138600,0.0,0.0,AFL,2016,29.597580,-0.008522,0
775,2016-12-29 00:00:00-05:00,29.370555,29.484067,29.345330,29.357943,2776600,0.0,0.0,AFL,2016,29.345335,0.000430,1


## Save

+ Apply transformations to calculate daily returns
+ Store the enriched data, the silver dataset, in a new directory.
+ Should we keep the same namespace? All columns?

In [20]:
features_path = os.getenv("FEATURES_DATA")
dd_rets.to_parquet(features_path, engine = "pyarrow")

# A few notes

# Jupyter? 

+ We have drafted our code in a Jupyter Notebook. 
+ Finalized code should be written in Python modules.

## Object oriented programming?

+ We can use classes to keep parameters and functions together.
+ We *could* use Object Oriented Programming, but parallelization of data manipulation and modelling tasks benefits from *Functional Programming*.
+ An Idea: 

    - [Data Oriented Programming](https://blog.klipse.tech/dop/2022/06/22/principles-of-dop.html).
    - Use the class to bundle together parameters and functions.
    - Use stateless operations and treat all data objects as immutable (we do not modify them, we overwrite them).
    - Take advantage of [`@staticmethod`](https://realpython.com/instance-class-and-static-methods-demystified/).

The code is in `./src/`.

Our original design was:

![](./img/target_pipeline_manager.png)

Our resulting interface is:

In [30]:
# don't run it all, delete your prices folder
# we are runnign two processedures at the same time
# dataManager class is 
# download all of our data
# there is a subprocess to process each ticker
# we are adding logs for each procedure
# everytime something is run, we log it
# DataManager class 
# static method have nothign to do with class, they take data into and output standard stuff
# it will change with the funciton
from data_manager import DataManager
dm = DataManager()

In [31]:
dm.download_all()

2024-03-03 20:36:46,714, data_manager.py, 50, INFO, Getting price data for all tickers.
2024-03-03 20:36:46,714, data_manager.py, 50, INFO, Getting price data for all tickers.
2024-03-03 20:36:46,717, data_manager.py, 59, INFO, Getting tickers from ../data/tickers/sp500_wiki.csv
2024-03-03 20:36:46,717, data_manager.py, 59, INFO, Getting tickers from ../data/tickers/sp500_wiki.csv
2024-03-03 20:36:46,768, data_manager.py, 65, INFO, Processing all tickers
2024-03-03 20:36:46,768, data_manager.py, 65, INFO, Processing all tickers
2024-03-03 20:36:46,781, data_manager.py, 80, INFO, Processing ticker MMM
2024-03-03 20:36:46,781, data_manager.py, 80, INFO, Processing ticker MMM
2024-03-03 20:36:46,783, data_manager.py, 115, INFO, Getting stock price data for MMM from 2000-01-01 to 2024-03-03
2024-03-03 20:36:46,783, data_manager.py, 115, INFO, Getting stock price data for MMM from 2000-01-01 to 2024-03-03
[*********************100%%**********************]  1 of 1 completed
2024-03-03 20:36:

In [None]:
dm.featurize()