# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `production_2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [9]:
from dotenv import load_dotenv
import os

# Load environment variables from the .env file
load_dotenv()

# Access the variable
price_data = os.getenv('PRICE_DATA')

# Example usage: print the paths to verify they are loaded correctly
print("PRICE_DATA:", price_data)


PRICE_DATA: ../../05_src/data/prices/


In [10]:
type(price_data)

str

In [4]:
price_data

'../../05_src/data/prices/'

In [11]:
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [None]:
import glob

# Load environment variables from .env file
load_dotenv()

# Access the PRICE_DATA environment variable
price_data_dir = os.getenv('PRICE_DATA')

# Verify that PRICE_DATA is loaded correctly
print("PRICE_DATA:", price_data_dir)

# Check if PRICE_DATA is set and use glob to find all .parquet files recursively in subdirectories
if price_data_dir:
    # Use '**/*.parquet' to search through all subdirectories
    parquet_files = glob.glob(os.path.join(price_data_dir, "**", "*.parquet"), recursive=True)

    # Print the list of parquet files found
    if parquet_files:
        print("Parquet files found in PRICE_DATA directory and its subdirectories:")
        for file in parquet_files:
            print(file)
    else:
        print("No parquet files found in the PRICE_DATA directory and its subdirectories.")
else:
    print("PRICE_DATA environment variable is not set.")


PRICE_DATA: ../../05_src/data/prices/
Parquet files found in PRICE_DATA directory and its subdirectories:
../../05_src/data/prices\A\A_2000.parquet
../../05_src/data/prices\A\A_2001.parquet
../../05_src/data/prices\A\A_2002.parquet
../../05_src/data/prices\A\A_2003.parquet
../../05_src/data/prices\A\A_2004.parquet
../../05_src/data/prices\A\A_2005.parquet
../../05_src/data/prices\A\A_2006.parquet
../../05_src/data/prices\A\A_2007.parquet
../../05_src/data/prices\A\A_2008.parquet
../../05_src/data/prices\A\A_2009.parquet
../../05_src/data/prices\A\A_2010.parquet
../../05_src/data/prices\A\A_2011.parquet
../../05_src/data/prices\A\A_2012.parquet
../../05_src/data/prices\A\A_2013.parquet
../../05_src/data/prices\A\A_2014.parquet
../../05_src/data/prices\A\A_2015.parquet
../../05_src/data/prices\A\A_2016.parquet
../../05_src/data/prices\A\A_2017.parquet
../../05_src/data/prices\A\A_2018.parquet
../../05_src/data/prices\A\A_2019.parquet
../../05_src/data/prices\A\A_2020.parquet
../../05_src

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj Close.
+ Add returns based on Adj Close:
    
    - `returns`: (Adj Close / Adj_close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [13]:
import dask.dataframe as dd

# Load environment variables from .env file
load_dotenv()

# Access the PRICE_DATA environment variable
price_data_dir = os.getenv('PRICE_DATA')

# Define the recursive path to find all parquet files
parquet_files = glob.glob(os.path.join(price_data_dir, "**", "*.parquet"), recursive=True)

# Initialize an empty list to collect processed data for each ticker
ticker_dataframes = []

# Process each parquet file (each ticker)
for file in parquet_files:
    # Load each parquet file as a Dask DataFrame
    df = dd.read_parquet(file)

    # Print the columns to verify the presence of expected columns
    print(f"Processing {file}")
    print("Columns in the DataFrame:", df.columns)

    # Sort by date to ensure lags are calculated correctly
    df = df.sort_values(by='Date')

    # Add lagged values for 'Close' and 'Adj Close'
    df['Close_lag'] = df['Close'].shift(1)

    if 'Adj Close' in df.columns:
        df['Adj_Close_lag'] = df['Adj Close'].shift(1)
        # Calculate returns based on Adjusted Close
        df['returns'] = (df['Adj Close'] / df['Adj_Close_lag']) - 1
    else:
        print(f"Adj Close column not found in {file}. Skipping returns calculation for this file.")

    # Calculate the high-low range for each day
    if 'High' in df.columns and 'Low' in df.columns:
        df['hi_lo_range'] = df['High'] - df['Low']
    else:
        print(f"High or Low column not found in {file}. Skipping hi_lo_range calculation for this file.")

    # Append the processed DataFrame to the list
    ticker_dataframes.append(df)




Processing ../../05_src/data/prices\A\A_2000.parquet
Columns in the DataFrame: Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'sector',
       'subsector', 'year'],
      dtype='object')
Processing ../../05_src/data/prices\A\A_2001.parquet
Columns in the DataFrame: Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'sector',
       'subsector', 'year'],
      dtype='object')
Processing ../../05_src/data/prices\A\A_2002.parquet
Columns in the DataFrame: Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'sector',
       'subsector', 'year'],
      dtype='object')
Processing ../../05_src/data/prices\A\A_2003.parquet
Columns in the DataFrame: Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'sector',
       'subsector', 'year'],
      dtype='object')
Processing ../../05_src/data/prices\A\A_2004.parquet
Columns in the DataFrame: Index(['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'sector',
 

In [None]:
batch_size = 10  # Define a batch size, e.g., concatenate every 10 files
batched_dataframes = []

for i in range(0, len(ticker_dataframes), batch_size):
    batch = ticker_dataframes[i:i + batch_size]
    print(f"Concatenating batch {i // batch_size + 1} of {len(ticker_dataframes) // batch_size + 1}...")
    
    # Concatenate the batch and add to the batched list
    batched_dataframes.append(dd.concat(batch))
    
    print(f"Batch {i // batch_size + 1} concatenated.")

# Final concatenation of all batches
print("Concatenating all batches into final DataFrame...")
dd_feat = dd.concat(batched_dataframes)
print("Final concatenation complete.")



Concatenating batch 1 of 2242...
Batch 1 concatenated.
Concatenating batch 2 of 2242...
Batch 2 concatenated.
Concatenating batch 3 of 2242...
Batch 3 concatenated.
Concatenating batch 4 of 2242...
Batch 4 concatenated.
Concatenating batch 5 of 2242...
Batch 5 concatenated.
Concatenating batch 6 of 2242...
Batch 6 concatenated.
Concatenating batch 7 of 2242...
Batch 7 concatenated.
Concatenating batch 8 of 2242...
Batch 8 concatenated.
Concatenating batch 9 of 2242...
Batch 9 concatenated.
Concatenating batch 10 of 2242...
Batch 10 concatenated.
Concatenating batch 11 of 2242...
Batch 11 concatenated.
Concatenating batch 12 of 2242...
Batch 12 concatenated.
Concatenating batch 13 of 2242...
Batch 13 concatenated.
Concatenating batch 14 of 2242...
Batch 14 concatenated.
Concatenating batch 15 of 2242...
Batch 15 concatenated.
Concatenating batch 16 of 2242...
Batch 16 concatenated.
Concatenating batch 17 of 2242...
Batch 17 concatenated.
Concatenating batch 18 of 2242...
Batch 18 concat

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [None]:

import pandas as pd


# Load environment variables and access the PRICE_DATA directory
price_data_dir = os.getenv('PRICE_DATA')
parquet_files = glob.glob(os.path.join(price_data_dir, "**", "*.parquet"), recursive=True)

# List to hold processed pandas DataFrames
px_list = []

# Process each parquet file (each ticker)
for file in parquet_files:
    print(f"Processing {file}")

    # Load the Dask DataFrame from each parquet file
    df = dd.read_parquet(file).compute()  # Convert to pandas DataFrame

    # Sort by date to ensure calculations are correct
    df = df.sort_values(by='Date')

    # Add lagged values and calculate returns
    df['Close_lag'] = df['Close'].shift(1)
    
    if 'Adj Close' in df.columns:
        df['Adj_Close_lag'] = df['Adj Close'].shift(1)
        df['returns'] = (df['Adj Close'] / df['Adj_Close_lag']) - 1
    else:
        print(f"Adj Close column not found in {file}. Skipping returns calculation for this file.")
        continue

    # Calculate high-low range if columns are available
    if 'High' in df.columns and 'Low' in df.columns:
        df['hi_lo_range'] = df['High'] - df['Low']
    else:
        print(f"High or Low column not found in {file}. Skipping hi_lo_range calculation for this file.")

    # Calculate 10-day rolling average return
    df['rolling_avg_return'] = df['returns'].rolling(window=10).mean()

    # Append processed DataFrame to the list
    px_list.append(df)

# Concatenate all DataFrames into a single pandas DataFrame
px_dt = pd.concat(px_list, axis=0).reset_index(drop=True)
print(f"Final shape of concatenated DataFrame: {px_dt.shape}")
print(px_dt.head())



KeyboardInterrupt: 

Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/scaling_to_production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.

In [None]:
# Check structure with `.meta` to confirm columns and types
print(dd_feat.meta)


AttributeError: 'DataFrame' object has no attribute 'meta'

In [None]:
from tqdm import tqdm

In [None]:
# Step 1: Check the number of partitions
num_partitions = dd_feat.npartitions
print("Number of partitions:", num_partitions)


Number of partitions: 22414


In [None]:
# Step 2: Get the row count of the first partition
sample_partition = dd_feat.get_partition(0).compute()
rows_in_first_partition = len(sample_partition)
print("Rows in the first partition:", rows_in_first_partition)


KeyboardInterrupt: 

In [None]:
import pandas as pd
import dask.dataframe as dd
import numpy as np

# Create a small Pandas DataFrame with sample data similar to your actual data
data = {
    'Date': pd.date_range(start='2023-01-01', periods=10, freq='D'),
    'Open': np.random.rand(10) * 100,
    'High': np.random.rand(10) * 100,
    'Low': np.random.rand(10) * 100,
    'Close': np.random.rand(10) * 100,
    'Adj Close': np.random.rand(10) * 100,
    'Volume': np.random.randint(1000, 10000, size=10),
    'sector': ['Tech', 'Finance', 'Healthcare', 'Energy', 'Consumer'] * 2,
    'subsector': ['Software', 'Banking', 'Pharma', 'Oil', 'Retail'] * 2,
    'year': [2023] * 10,
    'Close_lag': np.nan,         # Placeholder for lagged columns
    'Adj_Close_lag': np.nan,     # Placeholder for lagged columns
    'returns': np.nan,           # Placeholder for calculated values
    'hi_lo_range': np.nan        # Placeholder for calculated values
}

# Convert the Pandas DataFrame to a Dask DataFrame with 2 partitions
pdf = pd.DataFrame(data)
ddf = dd.from_pandas(pdf, npartitions=2)

# Display the Dask DataFrame
print("Dummy Dask DataFrame:")
print(ddf.compute())


Dummy Dask DataFrame:
        Date       Open       High        Low      Close  Adj Close  Volume  \
0 2023-01-01  17.394983  56.809444  28.876770  96.603774  53.608396    5635   
1 2023-01-02  28.313843  14.564357  41.763181  57.069472  10.099390    6193   
2 2023-01-03  79.899848  33.504172  56.408415  48.392604  82.248235    2030   
3 2023-01-04  90.179121  94.769820  21.716816  30.411982  25.179614    8929   
4 2023-01-05   9.131637  15.277236  33.904000  27.968384  93.750159    1284   
5 2023-01-06  85.322944  73.244320  23.316784  57.901926  73.702060    2646   
6 2023-01-07  91.636555  28.406865  84.750780  12.040173   6.638608    9235   
7 2023-01-08  71.431398  26.981535  62.689561  37.486646  33.216901    8848   
8 2023-01-09  92.458058  71.716970  89.035122  38.378198  96.886421    1369   
9 2023-01-10  40.899648  68.885372   3.226697  60.389026  15.684330    1405   

       sector subsector  year  Close_lag  Adj_Close_lag  returns  hi_lo_range  
0        Tech  Software  202

In [None]:
ddf.shape[0].compute()

10

In [None]:
import time
from tqdm import tqdm

# Create a list of items to process
items = range(100)  # Adjust the range for longer or shorter runs

# Simulate a time-consuming operation with tqdm progress bar
print("Processing items with a simulated delay:")
for item in tqdm(items, desc="Progress", unit="item"):
    # Simulate a delay (e.g., a time-consuming calculation)
    time.sleep(0.1)  # 0.1 seconds delay per item (100 items * 0.1s = ~10 seconds total)


Processing items with a simulated delay:


Progress: 100%|██████████| 100/100 [00:10<00:00,  9.14item/s]
