# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `production_2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
# Write your code below.
%load_ext dotenv
%dotenv

In [2]:
# Import Logger
import os
import sys

# add SRC_DIR to sys.path to locate .py file to import
sys.path.append(os.getenv('SRC_DIR'))

# Import logger
from logger import get_logger
_logs = get_logger(__name__)

_logs.info('Logger loaded.')
_logs.info('Environment variables loaded.')

2024-06-28 10:49:45,138, 3324719616.py, 12, INFO, Logger loaded.
2024-06-28 10:49:45,139, 3324719616.py, 13, INFO, Environment variables loaded.


In [3]:
import dask
dask.config.set({'dataframe.query-planning': True})
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [4]:
import os
from glob import glob

# Write your code below.
PRICE_DATA = os.getenv('PRICE_DATA')

# the code on this workbook is working differently form the 02_data_engineering ..
#   I had to add the file name to treat the error with dd.read_parquet()

parquet_files = glob(os.path.join(PRICE_DATA, "*/*.parquet", "part.0.parquet"))

_logs.info('END - Load all parquet file path into parquet_files list.')


2024-06-28 10:49:48,051, 2291119893.py, 12, INFO, END - Load all parquet file path into parquet_files list.


For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [5]:
parquet_files
#delete code box

['../../05_src/data/prices\\A\\A_2000.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2001.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2002.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2003.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2004.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2005.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2006.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2007.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2008.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2009.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2010.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2011.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2012.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2013.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2014.parquet\\part.0.parquet',
 '../../05_src/data/prices\\A\\A_2015.pa

In [6]:
# read all parquet files on the parquet_files list
# setting the index also prevents Full-Data Shuffling
dd_px = dd.read_parquet(parquet_files).set_index("ticker")

_logs.info('Parquet files read.')

# check for missing data on the data set
missing_values_count = dd_px.isna().sum()
missing_values_count.compute()

2024-06-28 10:49:49,702, 1967849700.py, 5, INFO, Parquet files read.


Date         0
Open         0
High         0
Low          0
Close        0
Adj Close    0
Volume       0
sector       0
subsector    0
year         0
dtype: int64

In [7]:
# Write your code below.

import numpy as np

# parquet layout from https://www.parquet-viewer.com/#parquet-online
# Date - datetime[ns]	Open - f64	High - f64	Low - f64	Close - f64	Adj Close - f64	Volume - i64	sector - str	subsector - str	year - i32	ticker - str

_logs.info('START - add variables.')

# add close_lag variable
# the is the shift(1) results in NaN, the .fillna will replace by the 'Close' value 
dd_feat = dd_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag = x['Close'].shift(1).fillna(x['Close']))
    )

# add adj_close_lag variable
# the is the shift(1) results in NaN, the .fillna will replace by the 'Adj Close' value 
dd_feat = dd_feat.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Adj_Close_lag = x['Adj Close'].shift(1).fillna(x['Adj Close']))
    )

# add 'return' variable
dd_feat = dd_feat.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(returns = ((x['Adj Close'] / x['Adj_Close_lag']) -1) )
    )

# add 'hi_low_range' variable
dd_feat = dd_feat.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(hi_lo_range = (x['High'] - x['Low']) )
    )

_logs.info('FINISH - add variables.')


2024-06-28 10:50:54,903, 1195116688.py, 8, INFO, START - add variables.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_feat = dd_px.groupby('ticker', group_keys=False).apply(
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_feat = dd_feat.groupby('ticker', group_keys=False).apply(
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_feat = dd_feat.groupby('ticker', group_keys=False).apply(
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_feat = dd_feat.groupby('ticker', group_keys=False).apply(
2024-06-28 10:

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [8]:
# Write your code below.
import pandas as pd

pd_feat = pd.DataFrame()

_logs.info('START - covert data frame to Pandas.')

# Convert the Dask data frame to a pandas data frame.
pd_feat = dd_feat.compute().reset_index()

_logs.info('START - covert data frame to Pandas.')

2024-06-28 10:50:55,007, 1471120444.py, 6, INFO, START - covert data frame to Pandas.
2024-06-28 11:05:40,246, 1471120444.py, 11, INFO, START - covert data frame to Pandas.


In [9]:
# Add a rolling average return calculation with a window of 10 days.

_logs.info('START - add rolling average.')

pd_feat['rolling_avg_return'] = (pd_feat.groupby(['ticker'])['returns']
                                 .rolling(window = 10, min_periods=1)
                                 .mean()
                                 .reset_index(drop=True)
                                 )

_logs.info('FINISH - add rolling average.')


2024-06-28 11:05:40,315, 2942599117.py, 3, INFO, START - add rolling average.
2024-06-28 11:05:41,611, 2942599117.py, 11, INFO, FINISH - add rolling average.


In [10]:
print (pd_feat.head(40))

   ticker       Date       Open       High        Low      Close  Adj Close  \
0     MSI 2006-01-03  92.532822  94.998749  91.481773  93.503021  70.679741   
1     MSI 2006-01-04  93.786003  95.120026  93.786003  94.594498  71.504837   
2     MSI 2006-01-05  95.079597  95.483849  94.190247  95.079597  71.871506   
3     MSI 2006-01-06  96.777451  98.919975  96.251923  98.394447  74.377220   
4     MSI 2006-01-09  98.394447  99.728477  97.222122  99.162521  74.957825   
5     MSI 2006-01-10  98.232750  98.394447  96.737022  96.939148  73.277153   
6     MSI 2006-01-11  97.626373  97.626373  96.009377  96.696602  73.093826   
7     MSI 2006-01-12  97.545525  97.747650  96.049797  96.454048  72.910461   
8     MSI 2006-01-13  96.979576  97.262550  96.171074  97.141273  73.429909   
9     MSI 2006-01-17  96.211502  97.100853  94.837051  95.079597  71.871506   
10    MSI 2006-01-18  93.583878  95.564697  93.583878  95.443428  72.146515   
11    MSI 2006-01-19  96.494476  98.677422  96.13065

In [11]:
# checking for missing information on the data set
missing_values_count = pd_feat.isnull().sum()
missing_values_count[0:16]

ticker                0
Date                  0
Open                  0
High                  0
Low                   0
Close                 0
Adj Close             0
Volume                0
sector                0
subsector             0
year                  0
Close_lag             0
Adj_Close_lag         0
returns               0
hi_lo_range           0
rolling_avg_return    0
dtype: int64

Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
    - No. Kask can calculate the moving average as well.

+ Would it have been better to do it in Dask? Why?
    - Pandas is easy to use and works well for data sets that fits into memory. For large data sets Dask will present a much better performance.
    - "For data that fits into RAM, pandas can often be faster and easier to use than Dask DataFrame." (https://docs.dask.org/en/stable/dataframe-best-practices.html).


(1 pt)

## Criteria

|Criteria|Complete|Incomplete|
|---------------------|----|----|
|Calculations         |Calculations were done correctly.|Calculations were not done correctly.|
|Explanation of answer|Answer was concise and explained the learner's reasoning in depth.|Answer was not concise and did not explained the learner's reasoning in depth.|

## Submission Information

ðŸš¨ **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** ðŸš¨ for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `23:59 PM - 30/06/2024`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [X] Created a branch with the correct naming convention.
- [X] Ensured that the repository is public.
- [X] Reviewed the PR description guidelines and adhered to them.
- [X] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.