# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [8]:
# Loading environment variables from .env file

%load_ext dotenv
%dotenv 



The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [2]:
import dask.dataframe as dd



+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [3]:
import os
from glob import glob

PRICE_DATA = os.getenv("PRICE_DATA")

parquet_files = glob(os.path.join(PRICE_DATA, "**/*.parquet"), recursive = True)

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Close:
    
    - `returns`: (Close / Close_lag_1) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [34]:
# Reading parquet files
# Setting index to Ticker
dd_px = dd.read_parquet(parquet_files).set_index("Ticker")


# Add lags for variables Close and Adj_Close
dd_shift = dd_px.groupby('Ticker', group_keys=False).apply(
    lambda x: x.assign(
        Close_lag_1 = x['Close'].shift(1),
        Adj_Close_lag_1 = x['Adj Close'].shift(1)
    )
)


# Add returns based on Close
dd_rets = dd_shift.assign(
    Returns = lambda x: (x['Close'] / x['Close_lag_1']) - 1
)


# Add the 'hi_lo_range' : this is the day's High minus Low.
dd_range = dd_rets.assign(
    hi_lo_range = lambda x: x['High'] - x['Low']
)


# Assign the result to 'dd_feat'
dd_feat = dd_range


dd_feat

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_shift = dd_px.groupby('Ticker', group_keys=False).apply(


Unnamed: 0_level_0,Date,Adj Close,Close,High,Low,Open,Volume,Year,Close_lag_1,Adj_Close_lag_1,Returns,hi_lo_range
npartitions=13078,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
,"datetime64[ns, UTC]",float64,float64,float64,float64,float64,float64,int32,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...


+ Convert the Dask data frame to a pandas data frame. 
+ Add a new feature containing the moving average of `returns` using a window of 10 days. There are several ways to solve this task, a simple one uses `.rolling(10).mean()`.

(3 pt)

In [37]:
# Convert to pandas dataframe
panda_feat = dd_feat.compute()


# Adding a new feature: moving average of 'returns' for the last 10 days
panda_feat['Returns_MA_10'] = panda_feat.groupby("Ticker")["Returns"].rolling(10).mean()


panda_feat

Price,Date,Adj Close,Close,High,Low,Open,Volume,Year,Close_lag_1,Adj_Close_lag_1,Returns,hi_lo_range
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
DOV,2002-01-02 00:00:00+00:00,16.045654,24.842640,24.963299,24.420326,24.869452,1022622.0,2002,,,,0.542973
DOV,2002-01-03 00:00:00+00:00,16.340069,25.298468,25.405722,24.835936,24.876156,989355.0,2002,24.842640,16.045654,0.018349,0.569786
DOV,2002-01-04 00:00:00+00:00,16.721081,25.888363,25.955397,25.405722,25.405722,1319488.0,2002,25.298468,16.340069,0.023317,0.549675
DOV,2002-01-07 00:00:00+00:00,16.608498,25.714075,26.136387,25.687262,25.975506,1060812.0,2002,25.888363,16.721081,-0.006732,0.449125
DOV,2002-01-08 00:00:00+00:00,16.236162,25.137587,25.807922,24.869452,25.673855,976377.0,2002,25.714075,16.608498,-0.022419,0.938471
...,...,...,...,...,...,...,...,...,...,...,...,...
CTLT,2011-12-23 00:00:00+00:00,,,,,,,2011,,,,
CTLT,2011-12-27 00:00:00+00:00,,,,,,,2011,,,,
CTLT,2011-12-28 00:00:00+00:00,,,,,,,2011,,,,
CTLT,2011-12-29 00:00:00+00:00,,,,,,,2011,,,,


Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

In [None]:
# Comment 1:
# I was getting this error while tryint to run it before converting to pands dataframe:
# "Can only rolling dataframes with known divisions"
# There is a fix for that "known divisions" error (persist() method per copilot comment)
# But I didn't spend time on it since it was not in the scope of the question.



# Comment 2:
# Yes, I think it's better to run it in Dask because it's faster and more efficient than pandas.

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.