# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `production_2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

# Assignment 1 answers Oct 28 0038h

In [1]:
%load_ext dotenv
%dotenv

In [2]:
# Load libraries
import os
from dotenv import load_dotenv
import pandas as pd
import dask.dataframe as dd
from glob import glob
import yfinance as yf
import sys

sys.path.append(os.getenv('SRC_DIR'))

In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

  import dask.dataframe as dd


In [3]:
# Load Dask DataFrame
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [4]:
from glob import glob

In [5]:
# Load PRICE_DATA environment variable
PRICE_DATA = os.getenv("PRICE_DATA")

if PRICE_DATA:
    # Use glob to find all parquet files in PRICE_DATA directory 
    parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive=True)
    
    # Filter out directories  
    parquet_files = [f for f in parquet_files if os.path.isfile(f)]
    
    if parquet_files:
        print(f"Found {len(parquet_files)} parquet files.")
        print("First few file paths:")
        for file in parquet_files[:5]:  # Print first 5 file paths  
            print(file)
        
        # Load parquet files into Dask df
        dd_px = dd.read_parquet(parquet_files)
        
        # Check if 'ticker' column exists before setting it as index
        if 'ticker' in dd_px.columns:
            dd_px = dd_px.set_index("ticker")
        
        print("\nDask dataframe created successfully.")
        print(f"Columns: {dd_px.columns.tolist()}")
        print(f"Number of partitions: {dd_px.npartitions}")
    else:
        print("No parquet files found in the PRICE_DATA directory.")
else:
    print("PRICE_DATA environment variable not set.")

No parquet files found in the PRICE_DATA directory.


For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [6]:
def process_ticker(group):
    
    # Add lags for Close and Adj Close
    group['Close_lag'] = group['Close'].shift(1)
    group['Adj Close_lag'] = group['Adj Close'].shift(1)
    
    # Calculate returns based on Adjusted Close
    group['returns'] = (group['Adj Close'] / group['Adj Close_lag']) - 1
    
    # Calculate high-low range
    group['hi_lo_range'] = group['High'] - group['Low']
    
    return group

# Apply function to each ticker group
dd_feat = dd_px.groupby('ticker').apply(process_ticker, meta=dd_px)

print("Features added successfully.")
print(f"Columns in dd_feat: {dd_feat.columns.tolist()}")
print(f"Number of partitions: {dd_feat.npartitions}")

NameError: name 'dd_px' is not defined

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [15]:
# Check type of dd_feat
print("Type of dd_feat:", type(dd_feat))

# Print current columns
print("\nCurrent columns:")
print(dd_feat.columns)

# Convert Dask DataFrame to pandas DataFrame
if isinstance(dd_feat, dd.DataFrame):
    df_feat = dd_feat.compute()
else:
    df_feat = dd_feat  

# Sort the DataFrame by ticker and date
df_feat = df_feat.sort_values(['ticker', 'Date'])

# Set 'ticker' as index for groupby operation
df_feat = df_feat.set_index('ticker')

# Add 10-day rolling average return calculation window
df_feat['rolling_avg_return'] = df_feat.groupby('ticker')['returns'].rolling(window=10).mean().reset_index(level=0, drop=True)

# Print information of resulting DataFrame
print("\nRolling average return added successfully.")
print(f"Shape of df_feat: {df_feat.shape}")
print(f"Columns in df_feat: {df_feat.columns.tolist()}")

# Display first few rows of DataFrame
print("\nFirst few rows of df_feat:")
print(df_feat.head())

# Display summary statistics
print("\nSummary statistics:")
print(df_feat.describe())

NameError: name 'dd_feat' is not defined

Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

1. Was it necessary to convert to pandas to calculate the moving average return?
No, it wasn't strictly necessary. Dask provides functionality for rolling operations including moving averages, which could have been used directly on the Dask DataFrame.

2. Would it have been better to do it in Dask? Why?
It would have been better to use Dask for a number of reasons, including the following:

Scalability: Dask is designed to handle larger-than-memory datasets. If the dataset is very large, Dask can process it in chunks without loading everything into memory at once.

Parallel processing: Dask can leverage multiple cores or even a cluster of machines to perform computations in parallel, potentially speeding up the calculation for large datasets.

Lazy evaluation: Dask uses lazy evaluation, which means it builds up a task graph of operations and only executes when necessary, which makes running the computation more efficient especially for complex chains of operations.

Consistency: Keeping the data in Dask format throughout the pipeline maintains consistency and allows for easier integration with other Dask operations.

Large datasets: Dask can be more useful and run more efficiently for large datasets

In this specific case, given that we're dealing with financial time series data that could potentially be quite large, it would likely have been better to perform the rolling average calculation in Dask

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.