# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
# Write your code below.
%load_ext dotenv
%dotenv


In [2]:
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [3]:
import os
from glob import glob
from datetime import datetime
import sys

# Write your code below.

sys.path.append(os.getenv('SRC_DIR'))
from utils.logger import get_logger
_logs = get_logger(__name__)


# Load the environment variable PRICE_DATA
PRICE_DATA = os.getenv('PRICE_DATA')
_logs.info(f"Loaded PRICE_DATA: {PRICE_DATA}")

# Using glob to find the path of all parquet files
parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive=True)

_logs.info(f"Found{len(parquet_files)} parquet files.")

#Verification
_logs.info(f"First 5 parquet files:{parquet_files[:5]}")

2025-06-08 21:13:07,183, 608807953.py, 15, INFO, Loaded PRICE_DATA: ../../05_src/data/prices/
2025-06-08 21:13:07,269, 608807953.py, 20, INFO, Found3014 parquet files.
2025-06-08 21:13:07,270, 608807953.py, 23, INFO, First 5 parquet files:['../../05_src/data/prices/NXJ/NXJ_2007/part.0.parquet', '../../05_src/data/prices/NXJ/NXJ_2007/part.1.parquet', '../../05_src/data/prices/NXJ/NXJ_2009/part.0.parquet', '../../05_src/data/prices/NXJ/NXJ_2009/part.1.parquet', '../../05_src/data/prices/NXJ/NXJ_2008/part.0.parquet']


For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Close:
    
    - `returns`: (Close / Close_lag_1) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [4]:
# Write your code below.

# Setting up ticker as index 
dd_px = dd.read_parquet(parquet_files).set_index("ticker")
_logs.info("Loaded parquet files into Dask Dataframe (dd_px).")
_logs.info(f"dd_px dtypes: {dd_px.dtypes}") # Check dtypes for verification

2025-06-08 21:13:15,986, 3430312111.py, 5, INFO, Loaded parquet files into Dask Dataframe (dd_px).
2025-06-08 21:13:15,987, 3430312111.py, 6, INFO, dd_px dtypes: Date          datetime64[ns]
Open                 float64
High                 float64
Low                  float64
Close                float64
Adj Close            float64
Volume               float64
source       string[pyarrow]
Year                   int32
dtype: object


In [5]:
# 'Date'  is in date time but if its not then we conver from obj to DateTime so lag operation runs smoothly
if 'Date' in dd_px.columns and dd_px['Date'].dtype != 'datetime64[ns]':
    dd_px['Date'] = dd_px['Date'].astype('datetime64[ns]')
    _logs.info("Converted 'Date' column to datetime64[ns].")
else:
      _logs.info("'Date' column is already datetime64[ns] type.") 

2025-06-08 21:13:21,122, 1134874372.py, 6, INFO, 'Date' column is already datetime64[ns] type.


In [8]:
# Adding features on dd_px
_logs.info("Starting feature engineering")

def add_features(group):
    """Function to add features to each ticker group"""

    # Sort by date to ensure proper lag calculations
    group = group.sort_values('Date')

    # Add lag feature
    group = group.assign(
        Close_lag_1 = group['Close'].shift(1),
        Adj_Close_lag_1 = group['Adj Close'].shift(1)
    )

    # Add returns
    group = group.assign(
        returns = (group['Close']/group['Close_lag_1']) - 1
    )

    # Add hi_lo range
    group = group.assign(
        hi_lo_range = group['High'] - group['Low']
    )

    return group

2025-06-08 21:13:29,099, 501825442.py, 2, INFO, Starting feature engineering


In [9]:
# Get a small sample to create proper meta specification
_logs.info("Creating meta specification")
sample_data = dd_px.head(100).reset_index()
sample_ticker = sample_data['ticker'].iloc[0]
sample_group = sample_data[sample_data['ticker']==sample_ticker]
sample_with_features = add_features(sample_group)

# Create meta dictionary based on sample
meta_dict = {col:sample_with_features[col].dtype for col in sample_with_features.columns}
_logs.info(f"Meta specification created with {len(meta_dict)} columns")

2025-06-08 21:13:32,737, 2020478666.py, 2, INFO, Creating meta specification
2025-06-08 21:13:36,879, 2020478666.py, 10, INFO, Meta specification created with 14 columns


In [10]:
# Sample meta specificaiton worked sucesfully so we can now apply our group funtion to the whole data
_logs.info("Applying feature engineering to all ticker groups")
dd_feat = dd_px.reset_index().groupby('ticker', group_keys=False).apply(
    add_features,
    meta=meta_dict
)
# Set ticker back as index if needed
dd_feat = dd_feat.set_index('ticker')
_logs.info("Successfully added all features:")
_logs.info("- Close_lag_1: Previous day's closing price")
_logs.info("- Adj_Close_lag_1: Previous day's adjusted closing price") 
_logs.info("- returns: (Close / Close_lag_1) - 1")
_logs.info("- hi_lo_range: High - Low")

_logs.info(f"dd_feat columns: {dd_feat.columns.tolist()}")

2025-06-08 21:13:42,856, 1209356697.py, 2, INFO, Applying feature engineering to all ticker groups
2025-06-08 21:13:42,865, 1209356697.py, 9, INFO, Successfully added all features:
2025-06-08 21:13:42,866, 1209356697.py, 10, INFO, - Close_lag_1: Previous day's closing price
2025-06-08 21:13:42,866, 1209356697.py, 11, INFO, - Adj_Close_lag_1: Previous day's adjusted closing price
2025-06-08 21:13:42,867, 1209356697.py, 12, INFO, - returns: (Close / Close_lag_1) - 1
2025-06-08 21:13:42,868, 1209356697.py, 13, INFO, - hi_lo_range: High - Low
2025-06-08 21:13:42,869, 1209356697.py, 15, INFO, dd_feat columns: ['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'source', 'Year', 'Close_lag_1', 'Adj_Close_lag_1', 'returns', 'hi_lo_range']


+ Convert the Dask data frame to a pandas data frame. 
+ Add a new feature containing the moving average of `returns` using a window of 10 days. There are several ways to solve this task, a simple one uses `.rolling(10).mean()`.

(3 pt)

In [11]:
# Write your code below.

_logs.info("Converting Dask dataframe to pandas dataframe")
df_feat = dd_feat.compute() # pandas 
_logs.info(f"Converted to pandas dataframe with shape: {df_feat.shape}")

2025-06-08 21:13:48,418, 555958015.py, 3, INFO, Converting Dask dataframe to pandas dataframe
2025-06-08 21:14:12,296, 555958015.py, 5, INFO, Converted to pandas dataframe with shape: (346778, 13)


In [12]:
# Adding moving average of retuns with 10-day window
_logs.info("Adding 10-day moving average of returns")

df_feat = df_feat.sort_values(['ticker', 'Date']) 

df_feat['returns_ma_10'] = df_feat.groupby('ticker')['returns'].transform(lambda x:x.rolling(10).mean()) # pandas 

_logs.info("Added 'returns_ma_10' feature (10-day moving average of returns).")
_logs.info(f"Final dataframe shape: {df_feat.shape}")

print("Assignment completed successfully!")
print(f"Final dataframe shape: {df_feat.shape}")
print(f"Number of columns: {len(df_feat.columns)}")

2025-06-08 21:14:15,598, 3816887444.py, 2, INFO, Adding 10-day moving average of returns
2025-06-08 21:14:15,709, 3816887444.py, 8, INFO, Added 'returns_ma_10' feature (10-day moving average of returns).
2025-06-08 21:14:15,709, 3816887444.py, 9, INFO, Final dataframe shape: (346778, 14)


Assignment completed successfully!
Final dataframe shape: (346778, 14)
Number of columns: 14


In [13]:
# Select a nice sample that shows the moving average in action
sample_view = df_feat[df_feat['returns_ma_10'].notna()].groupby('ticker').head(3)
sample_table = sample_view[['Date', 'Close', 'returns', 'returns_ma_10']].round(4)
_logs.info(f"Sample of returns_ma_10 by ticker:\n{sample_table.head(15).to_string()}")

2025-06-08 21:14:20,129, 1889327385.py, 4, INFO, Sample of returns_ma_10 by ticker:
             Date    Close  returns  returns_ma_10
ticker                                            
A      1999-12-03  31.8312   0.0085         0.0024
A      1999-12-06  32.7253   0.0281         0.0135
A      1999-12-07  32.3677  -0.0109         0.0034
ACB    2000-01-18  39.0312  -0.0068        -0.0045
ACB    2000-01-19  39.0312   0.0000        -0.0045
ACB    2000-01-20  38.7657  -0.0068        -0.0020
ACBI   2015-11-16  14.5300  -0.0176        -0.0026
ACBI   2015-11-17  14.7500   0.0151        -0.0055
ACBI   2015-11-18  14.5700  -0.0122         0.0020
ACIO   2019-07-24  25.3200   0.0043         0.0008
ACIO   2019-07-25  25.2680  -0.0021         0.0006
ACIO   2019-07-26  25.4000   0.0052         0.0007
ALDX   2014-05-16   7.2200  -0.0296         0.0036
ALDX   2014-05-19   7.4100   0.0263         0.0132
ALDX   2014-05-20   7.3400  -0.0094         0.0212


Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?

No, it was not necessary to convert to pandas. It depends on the usecase. Pandas for small to medium datasets, easy inspection, debugging and analysis. Its straight forward in pandas. (like in our case)

+ Would it have been better to do it in Dask? Why?

Dask for large datasets, limited system memory, if it involves many operations before final compute and the availability of distributed computing. Does not use memory until we hit compute(). If we have a need for parallel/distributed computing we go for dask.

(1 pt)

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.