# Working with parquet files

## Objective

+ In this assignment, we will use the data downloaded with the module `data_manager` to create features.

(11 pts total)

## Prerequisites

+ This notebook assumes that price data is available to you in the environment variable `PRICE_DATA`. If you have not done so, then execute the notebook `01_materials/labs/2_data_engineering.ipynb` to create this data set.


+ Load the environment variables using dotenv. (1 pt)

In [3]:
# Write your code below.
%load_ext dotenv
%dotenv 

import os
import sys

sys.path.append(os.getenv('SRC_DIR'))
from logger import get_logger
_logs = get_logger (__name__)



In [4]:
import dask.dataframe as dd
import pandas as pd
import numpy as np

In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

  import dask.dataframe as dd


+ Load the environment variable `PRICE_DATA`.
+ Use [glob](https://docs.python.org/3/library/glob.html) to find the path of all parquet files in the directory `PRICE_DATA`.

(1pt)

In [5]:
import os
from glob import glob

ft_dir = os.getenv("PRICE_DATA")
ft_dir
ft_glob = glob(os.path.join(ft_dir, '**/*.parquet'), recursive = True)
ft_glob = [f for f in ft_glob if os.path.isfile(f)]
ft_glob
df_px = dd.read_parquet(ft_glob).set_index("ticker")
print(df_px.head())





             Date       Open       High        Low      Close  Adj Close  \
ticker                                                                     
A      2000-01-03  56.330471  56.464592  48.193848  51.502148  43.463036   
A      2000-01-04  48.730328  49.266811  46.316166  47.567955  40.142933   
A      2000-01-05  47.389126  47.567955  43.141991  44.617310  37.652882   
A      2000-01-06  44.080830  44.349072  41.577251  42.918453  36.219193   
A      2000-01-07  42.247852  47.165592  42.203148  46.494991  39.237461   

         Volume       sector                       subsector  year  
ticker                                                              
A       4674353  Health Care  Life Sciences Tools & Services  2000  
A       4765083  Health Care  Life Sciences Tools & Services  2000  
A       5758642  Health Care  Life Sciences Tools & Services  2000  
A       2534434  Health Care  Life Sciences Tools & Services  2000  
A       2819626  Health Care  Life Sciences Tools & S

In [150]:
len(df_px)

2779193

For each ticker and using Dask, do the following:

+ Add lags for variables Close and Adj_Close.
+ Add returns based on Adjusted Close:
    
    - `returns`: (Adj Close / Adj Close_lag) - 1

+ Add the following range: 

    - `hi_lo_range`: this is the day's High minus Low.

+ Assign the result to `dd_feat`.

(4 pt)

In [19]:
dd_rets = (df_px.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
).assign(
    returns = lambda x: x['Close']/x['Close_lag_1'] - 1
).assign(
    positive_return = lambda x: (x['returns'] > 0)*1
))

dd_rets1 = (dd_rets.groupby('ticker', group_keys=False).apply(
    lambda x: x.assign(AdjClose_lag_1 = x['Adj Close'].shift(1))
#).apply(
#    lambda x: x.assign(Close_lag_1 = x['Close'].shift(1))
).assign(
    returns = lambda x: x['Adj Close']/x['AdjClose_lag_1'] - 1
).assign(
    hi_lo_range = lambda x: (x['High']-x['Low'])
))

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_rets = (df_px.groupby('ticker', group_keys=False).apply(
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  dd_rets1 = (dd_rets.groupby('ticker', group_keys=False).apply(


In [22]:
dd_rets1
dd_feat = dd_rets1

+ Convert the Dask data frame to a pandas data frame. 
+ Add a rolling average return calculation with a window of 10 days.
+ *Tip*: Consider using `.rolling(10).mean()`.

(3 pt)

In [23]:
# Write your code below.
dd_feat_pandas = dd_feat.compute()
dd_feat_pandas['rolling_avg_return'] = dd_feat_pandas['returns'].rolling(window=10).mean()

In [25]:
dd_feat_pandas
sorted_dd_feat = dd_feat_pandas.sort_values(by='ticker')
print(sorted_dd_feat)

             Date        Open        High         Low       Close   Adj Close  \
ticker                                                                          
A      2018-09-28   70.690002   71.000000   70.459999   70.540001   67.407463   
A      2022-03-21  137.630005  139.789993  137.009995  138.139999  135.558334   
A      2022-03-18  136.210007  139.389999  136.210007  139.119995  136.520020   
A      2022-03-17  134.000000  136.850006  133.610001  136.820007  134.263000   
A      2022-03-16  131.929993  135.610001  131.300003  135.000000  132.477020   
...           ...         ...         ...         ...         ...         ...   
ZTS    2017-12-01   72.220001   72.349998   71.279999   72.300003   68.791153   
ZTS    2017-12-04   72.440002   72.790001   71.000000   71.019997   67.573257   
ZTS    2017-12-05   70.820000   71.769997   70.599998   70.849998   67.411514   
ZTS    2017-11-21   71.029999   72.139999   70.970001   71.360001   67.896774   
ZTS    2013-03-05   34.41000

Please comment:

+ Was it necessary to convert to pandas to calculate the moving average return?
+ Would it have been better to do it in Dask? Why?

(1 pt)

No, it was not necessary to convert to pandas, the calculation could have been calculated directly in Dask. 
In Dask the same calculations might have been done more efficiently since Dask generally can handle large datasets better then pandas.

## Criteria

The [rubric](./assignment_1_rubric_clean.xlsx) contains the criteria for grading.

## Submission Information

🚨 **Please review our [Assignment Submission Guide](https://github.com/UofT-DSI/onboarding/blob/main/onboarding_documents/submissions.md)** 🚨 for detailed instructions on how to format, branch, and submit your work. Following these guidelines is crucial for your submissions to be evaluated correctly.

### Submission Parameters:
* Submission Due Date: `HH:MM AM/PM - DD/MM/YYYY`
* The branch name for your repo should be: `assignment-1`
* What to submit for this assignment:
    * This Jupyter Notebook (assignment_1.ipynb) should be populated and should be the only change in your pull request.
* What the pull request link should look like for this assignment: `https://github.com/<your_github_username>/production/pull/<pr_id>`
    * Open a private window in your browser. Copy and paste the link to your pull request into the address bar. Make sure you can see your pull request properly. This helps the technical facilitator and learning support staff review your submission easily.

Checklist:
- [ ] Created a branch with the correct naming convention.
- [ ] Ensured that the repository is public.
- [ ] Reviewed the PR description guidelines and adhered to them.
- [ ] Verify that the link is accessible in a private browser window.

If you encounter any difficulties or have questions, please don't hesitate to reach out to our team via our Slack at `#cohort-3-help`. Our Technical Facilitators and Learning Support staff are here to help you navigate any challenges.