# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [1]:
# Write your code below.
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
price_data_path = os.getenv("PRICE_DATA")


In [2]:
import dask.dataframe as dd

In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

  import dask.dataframe as dd


+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [3]:
from glob import glob
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
price_data_path = os.getenv("PRICE_DATA")

# Use recursive glob to find all `.parquet` files in `PRICE_DATA` and its subdirectories
parquet_files = glob(os.path.join(price_data_path, '**', '*.parquet'), recursive=True)




For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [101]:
import os
import pandas as pd

# Specify the path to the price data folder
price_data_path = '../../05_src/data/prices/'  # Updated path to the price data folder

# Initialize a list to hold DataFrames for each ticker
all_data = []

# Loop through each ticker folder
for ticker in os.listdir(price_data_path):
    ticker_folder = os.path.join(price_data_path, ticker)
    
    # Check if it's a directory
    if os.path.isdir(ticker_folder):
        files_in_ticker_folder = os.listdir(ticker_folder)
        
        # Loop through each file in the ticker folder
        for file in files_in_ticker_folder:
            if file.endswith('.parquet'):
                file_path = os.path.join(ticker_folder, file)
                
                # Read the .parquet file into a DataFrame
                data = pd.read_parquet(file_path)
                
                # Optionally, add a column for the ticker symbol
                data['Ticker'] = ticker
                
                # Append the DataFrame to the list
                all_data.append(data)

# Combine all DataFrames into one (if desired)
combined_data = pd.concat(all_data, ignore_index=True)

# Display the first few rows of the combined DataFrame
print(combined_data.head())


        Date    Open    High     Low   Close  Adj Close   Volume       sector  \
0 2008-01-02  8.3725  8.4050  8.0650  8.0875   3.980135  5793600  Industrials   
1 2008-01-03  8.1525  8.2875  8.1275  8.2425   4.056416  5339200  Industrials   
2 2008-01-04  8.1625  8.1850  7.9025  7.9025   3.889090  5692400  Industrials   
3 2008-01-07  7.9050  8.2175  7.9050  8.1175   3.994899  5946000  Industrials   
4 2008-01-08  8.1350  8.1475  7.7750  7.7750   3.826344  6229600  Industrials   

                      subsector  year Ticker  
0  Diversified Support Services  2008   CTAS  
1  Diversified Support Services  2008   CTAS  
2  Diversified Support Services  2008   CTAS  
3  Diversified Support Services  2008   CTAS  
4  Diversified Support Services  2008   CTAS  


+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [6]:
import os
import pandas as pd
import dask.dataframe as dd  
from glob import glob
from dotenv import load_dotenv

# Load environment variables
load_dotenv()
price_data_path = os.getenv("PRICE_DATA")

# Use recursive glob to find all `.parquet` files in `PRICE_DATA` and its subdirectories
parquet_files = glob(os.path.join(price_data_path, '**', '*.parquet'), recursive=True)

# Initialize a list to hold DataFrames for each ticker
all_data = []

# Loop through each ticker folder
for ticker in os.listdir(price_data_path):
    ticker_folder = os.path.join(price_data_path, ticker)
    # Check if it's a directory
    if os.path.isdir(ticker_folder):
        files_in_ticker_folder = os.listdir(ticker_folder)
        # Loop through each file in the ticker folder
        for file in files_in_ticker_folder:
            if file.endswith('.parquet'):
                file_path = os.path.join(ticker_folder, file)
                # Read the .parquet file into a DataFrame
                data = pd.read_parquet(file_path)
                # Optionally, add a column for the ticker symbol
                data['Ticker'] = ticker
                # Append the DataFrame to the list
                all_data.append(data)

# Combine all DataFrames into one 
combined_data = pd.concat(all_data, ignore_index=True)

# Convert the combined DataFrame to a Dask DataFrame
dask_df = dd.from_pandas(combined_data, npartitions=2)  


# Calculate returns 
dask_df['returns'] = (dask_df['Adj Close'] / dask_df['Adj Close'].shift(1)) - 1

# Convert Dask DataFrame to Pandas DataFrame
pandas_df = dask_df.compute()

# Calculate the rolling average return with a 10-day window
pandas_df['Rolling_Avg_Return'] = pandas_df['returns'].rolling(window=10).mean()

# Display the first few rows of the final DataFrame
print(pandas_df.head())


        Date    Open    High     Low   Close  Adj Close   Volume       sector  \
0 2008-01-02  8.3725  8.4050  8.0650  8.0875   3.980135  5793600  Industrials   
1 2008-01-03  8.1525  8.2875  8.1275  8.2425   4.056416  5339200  Industrials   
2 2008-01-04  8.1625  8.1850  7.9025  7.9025   3.889090  5692400  Industrials   
3 2008-01-07  7.9050  8.2175  7.9050  8.1175   3.994899  5946000  Industrials   
4 2008-01-08  8.1350  8.1475  7.7750  7.7750   3.826344  6229600  Industrials   

                      subsector  year Ticker   returns  Rolling_Avg_Return  
0  Diversified Support Services  2008   CTAS       NaN                 NaN  
1  Diversified Support Services  2008   CTAS  0.019166                 NaN  
2  Diversified Support Services  2008   CTAS -0.041250                 NaN  
3  Diversified Support Services  2008   CTAS  0.027207                 NaN  
4  Diversified Support Services  2008   CTAS -0.042193                 NaN  


Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
 Yes, in this case, switching to Pandas was helpful because, although Dask can perform rolling operations, Pandas often handles them more smoothly on smaller datasets. Using Dask for small data might actually introduce extra processing without real benefits, as Dask is optimized for larger-scale data.

+ Would it have been better to do it in Dask? Why?
For large datasets, Dask would likely be the better choice for calculating the rolling average. It’s designed to work efficiently with bigger data by managing memory better, breaking data into chunks, and only computing when necessary. This way, Dask can handle data sizes that might otherwise overwhelm Pandas.

(1 pt)

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.