# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
%load_ext dotenv
%dotenv 



In [16]:
import dask.dataframe as dd
import os
from glob import glob
import pandas as pd
from datetime import datetime
import sys

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [17]:
sys.path.append(os.getenv('SRC_DIR'))
from utils.logger import get_logger
_logs = get_logger(__name__)

In [18]:
# Load the environment variable PRICE_DATA
PRICE_DATA = os.getenv('PRICE_DATA')
_logs.info(f"Loaded PRICE_DATA: {PRICE_DATA}")




2025-06-11 01:26:20,375, 2330488740.py, 3, INFO, Loaded PRICE_DATA: ../../05_src/data/prices/


In [19]:
# Using glob to find the path of all parquet files
parquet_files = glob(os.path.join(PRICE_DATA, "**", "*.parquet"), recursive=True)
print(f"Found {len(parquet_files)} parquet files")


Found 2725 parquet files


In [20]:
# Verification
_logs.info(f"First 5 parquet files: {parquet_files[:5]}")

2025-06-11 01:27:26,994, 4029740748.py, 2, INFO, First 5 parquet files: ['../../05_src/data/prices/BKTI/BKTI_2012/part.0.parquet', '../../05_src/data/prices/BKTI/BKTI_2012/part.1.parquet', '../../05_src/data/prices/BKTI/BKTI_2015/part.0.parquet', '../../05_src/data/prices/BKTI/BKTI_2015/part.1.parquet', '../../05_src/data/prices/BKTI/BKTI_2014/part.0.parquet']


For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Close:
    
    - `returns`: (Close / Close_lag_1) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [24]:
# DO NOT set ticker as index - keep default index
dd_px = dd.read_parquet(parquet_files)

_logs.info("Loaded parquet files into Dask Dataframe (dd_px).")
print("\n")
print(f"dd_px dtypes: {dd_px.dtypes}")

# Columns
columns = ddf.columns.tolist()
print(f"Columns in the dataset: {columns}")


2025-06-11 01:29:39,900, 2114134884.py, 4, INFO, Loaded parquet files into Dask Dataframe (dd_px).




dd_px dtypes: Date          datetime64[ns]
Open                 float64
High                 float64
Low                  float64
Close                float64
Adj Close            float64
Volume               float64
source       string[pyarrow]
ticker       string[pyarrow]
Year                   int32
dtype: object
Columns in the dataset: ['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'source', 'ticker', 'Year', 'Close_lag_1', 'Adj_Close_lag_1', 'returns', 'hi_lo_range']


In [25]:
def add_features(group):
    """Function to add features to each ticker group"""
    # Sort by date to ensure proper lag calculations
    group = group.sort_values('Date')
    
    # Add lag feature
    group = group.assign(
        Close_lag_1=group['Close'].shift(1),
        Adj_Close_lag_1=group['Adj Close'].shift(1)
    )
    
    # Add returns
    group = group.assign(
        returns=(group['Close'] / group['Close_lag_1']) - 1
    )
    
    # Add hi_lo range
    group = group.assign(
        hi_lo_range=group['High'] - group['Low']
    )
    
    return group

In [26]:
# Get a small sample to create proper meta specification
_logs.info("Creating meta specification")
sample_data = dd_px.head(100)
sample_ticker = sample_data['ticker'].iloc[0]
sample_group = sample_data[sample_data['ticker'] == sample_ticker]
sample_with_features = add_features(sample_group)

2025-06-11 01:30:49,273, 3577835805.py, 2, INFO, Creating meta specification


In [27]:
# Create meta dictionary based on sample
meta_dict = {col: sample_with_features[col].dtype for col in sample_with_features.columns}
_logs.info(f"Meta specification created with {len(meta_dict)} columns")

2025-06-11 01:31:04,379, 2825510153.py, 3, INFO, Meta specification created with 14 columns


In [28]:
# Apply group function to the whole data - no need to reset_index since we didn't set one
_logs.info("Applying feature engineering to all ticker groups")
dd_feat = dd_px.groupby('ticker', group_keys=False).apply(
    add_features,
    meta=meta_dict
)

_logs.info("Successfully added all features:")
_logs.info("- Close_lag_1: Previous day's closing price")
_logs.info("- Adj_Close_lag_1: Previous day's adjusted closing price")
_logs.info("- returns: (Close / Close_lag_1) - 1")
_logs.info("- hi_lo_range: High - Low")

_logs.info(f"dd_feat columns: {dd_feat.columns.tolist()}")

2025-06-11 01:31:44,819, 645997329.py, 2, INFO, Applying feature engineering to all ticker groups
2025-06-11 01:31:44,824, 645997329.py, 8, INFO, Successfully added all features:
2025-06-11 01:31:44,825, 645997329.py, 9, INFO, - Close_lag_1: Previous day's closing price
2025-06-11 01:31:44,826, 645997329.py, 10, INFO, - Adj_Close_lag_1: Previous day's adjusted closing price
2025-06-11 01:31:44,827, 645997329.py, 11, INFO, - returns: (Close / Close_lag_1) - 1
2025-06-11 01:31:44,827, 645997329.py, 12, INFO, - hi_lo_range: High - Low
2025-06-11 01:31:44,828, 645997329.py, 14, INFO, dd_feat columns: ['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'source', 'ticker', 'Year', 'Close_lag_1', 'Adj_Close_lag_1', 'returns', 'hi_lo_range']


In [29]:
# Convert to pandas
_logs.info("Converting Dask dataframe to pandas dataframe")
df_feat = dd_feat.compute()
_logs.info(f"Converted to pandas dataframe with shape: {df_feat.shape}")


2025-06-11 01:32:23,610, 4243127812.py, 2, INFO, Converting Dask dataframe to pandas dataframe
2025-06-11 01:32:40,992, 4243127812.py, 4, INFO, Converted to pandas dataframe with shape: (311001, 14)


In [30]:
# Adding moving average of returns with 10-day window
_logs.info("Adding 10-day moving average of returns")

# Sort ticker and date to ensure proper order for rolling calculations
df_feat = df_feat.sort_values(['ticker', 'Date'])

# Calculate 10 day moving average of returns for each ticker
df_feat['returns_ma_10'] = df_feat.groupby('ticker')['returns'].transform(
    lambda x: x.rolling(10).mean()
)

_logs.info("Added 'returns_ma_10' feature (10-day moving average of returns).")
_logs.info(f"Final dataframe shape: {df_feat.shape}")

2025-06-11 01:32:58,318, 1889558176.py, 2, INFO, Adding 10-day moving average of returns
2025-06-11 01:32:58,430, 1889558176.py, 12, INFO, Added 'returns_ma_10' feature (10-day moving average of returns).
2025-06-11 01:32:58,430, 1889558176.py, 13, INFO, Final dataframe shape: (311001, 15)


In [31]:
# Simple verification - just show basic info
print("Assignment completed successfully!")
print(f"Final dataframe shape: {df_feat.shape}")
print(f"Number of columns: {len(df_feat.columns)}")

_logs.info(f"Final columns: {df_feat.columns.tolist()}")

2025-06-11 01:33:13,178, 3463744243.py, 6, INFO, Final columns: ['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'source', 'ticker', 'Year', 'Close_lag_1', 'Adj_Close_lag_1', 'returns', 'hi_lo_range', 'returns_ma_10']


Assignment completed successfully!
Final dataframe shape: (311001, 15)
Number of columns: 15


In [32]:
# Select a nice sample that shows the moving average in action
sample_view = df_feat[df_feat['returns_ma_10'].notna()].groupby('ticker').head(3)
sample_table = sample_view[['ticker', 'Date', 'Close', 'returns', 'returns_ma_10']].round(4)
_logs.info(f"Sample of returns_ma_10 by ticker:\n{sample_table.head(15).to_string()}")

2025-06-11 01:33:22,513, 4125431718.py, 4, INFO, Sample of returns_ma_10 by ticker:
       ticker       Date     Close  returns  returns_ma_10
38749       A 1999-12-03   31.8312   0.0085         0.0024
38750       A 1999-12-06   32.7253   0.0281         0.0135
38751       A 1999-12-07   32.3677  -0.0109         0.0034
131198    ACB 2000-01-18   39.0312  -0.0068        -0.0045
131199    ACB 2000-01-19   39.0312   0.0000        -0.0045
131200    ACB 2000-01-20   38.7657  -0.0068        -0.0020
2618     ALDX 2014-05-16    7.2200  -0.0296         0.0036
2619     ALDX 2014-05-19    7.4100   0.0263         0.0132
2620     ALDX 2014-05-20    7.3400  -0.0094         0.0212
206167   AMAT 1980-03-31    0.0946   0.1122         0.0015
206168   AMAT 1980-04-01    0.0938  -0.0092         0.0024
206169   AMAT 1980-04-02    0.0885  -0.0556        -0.0050
36750    AMPY 2012-05-04  144.6000  -0.0062        -0.0032
36751    AMPY 2012-05-07  142.2000  -0.0166        -0.0089
36752    AMPY 2012-05-08  142.0

+ Convert the Dask data frame to a pandas data frame. 
+ Add a new feature containing the moving average of `returns` using a window of 10 days. There are several ways to solve this task, a simple one uses `.rolling(10).mean()`.

(3 pt)

Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

It was not strictly necessary to convert to pandas. Dask supports rolling window operations like .rolling().mean() starting from version 2022.10.0, but with some limitations — for example, it requires a known partition structure and often works best when the data is sorted and indexed correctly. In this case, converting to pandas made the rolling calculation more straightforward and allowed immediate inspection of results.

However, for large-scale datasets that don't fit into memory, it would be better to keep the data in Dask and use Dask's rolling methods combined with proper partitioning and indexing for scalability.

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.