# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [None]:
# Write your code below.
%load_ext dotenv
%dotenv 


In [66]:
import dask.dataframe as dd

+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [73]:
import os
from glob import glob

# pointing all parquet files in the directory to price data 
all_parquet_files = glob(os.path.join(os.getenv('PRICE_DATA'),"**/**/*.parquet"))


For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Close:
    
    - `returns`: (Close / Close_lag_1) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [None]:
import pandas as pd
import dask.dataframe as dd

# 1. define meta for Dask, just a shortcut because DASK is lazy:
meta_df = ddf.head(0).assign(Close_lag_1=0.0, Adj_Close_lag_1=0.0, returns=0.0, hi_lo_range=0.0)

# 2. Perform the groupby and apply
dd_feat = (
    ddf.groupby('ticker', group_keys=False)
       .apply(
            lambda x: x.sort_values('Date', ascending=True)
                       .assign(
                           Close_lag_1 = x['Close'].shift(1),
                           Adj_Close_lag_1 = x['Adj Close'].shift(1),
                           returns = lambda df: (df['Close'] / df['Close'].shift(1)) - 1,
                           hi_lo_range = lambda df: df['High'] - df['Low']
                       ), 
            meta=meta_df
    )
)

In [None]:
dd_feat #I just want to see how it looks

Unnamed: 0_level_0,Date,Open,High,Low,Close,Adj Close,Volume,source,ticker,Year,Close_lag_1,Adj_Close_lag_1,returns,hi_lo_range
npartitions=3068,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
,datetime64[ns],float64,float64,float64,float64,float64,float64,string,string,int32,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [70]:
# Convert dask to pandas
df_feat = dd_feat.compute()

# Add a 10-day moving average of returns per ticker

df_feat['returns_ma_10'] = (
    df_feat.groupby('ticker')['returns']
           .transform(lambda x: x.rolling(window=10).mean())
)

# Preview the result
print(df_feat.head())

             Date       Open       High        Low      Close  Adj Close  \
114997 2013-06-17  43.200001  50.000000  43.200001  45.000000  45.000000   
114998 2013-06-18  44.500000  45.000000  43.500000  44.000000  44.000000   
114999 2013-06-19  44.000000  44.049999  43.000000  43.099998  43.099998   
115000 2013-06-20  43.000000  43.500000  42.000000  42.939999  42.939999   
115001 2013-06-21  43.290001  43.500000  42.040001  43.500000  43.500000   

           Volume   source ticker  Year  Close_lag_1  Adj_Close_lag_1  \
114997   236700.0  MNK.csv    MNK  2013     3.490000         3.490000   
114998  1082600.0  MNK.csv    MNK  2013    45.000000        45.000000   
114999   540800.0  MNK.csv    MNK  2013    44.000000        44.000000   
115000  1013600.0  MNK.csv    MNK  2013    43.099998        43.099998   
115001   913700.0  MNK.csv    MNK  2013    42.939999        42.939999   

         returns  hi_lo_range  returns_ma_10  
114997       NaN     6.799999            NaN  
114998 -0.

Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

From what I have read, it is not necessary but highly practical as DASK splits into partition, rolling average would require a data overlap. In pandas the operation .groupby().transform() simplifies the process. 
From what I have learned, if Pandas can handle it, RAM wise, there is no need to do it in DASK. But if the database is so big, df_compute() might crash the system and it is this case that DASK is better. 

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.