# Resampling data using Dask

#### Description:

This codebook covers how to resample daily share price data into weekly frequency using Dask.

#### Skill level:

- Intermediate

### Import the required libraries
-------------------------

In [1]:
import os
import sys

platform_path = os.path.abspath(os.path.join(os.path.abspath(''), '../../../'))
sys.path.append(platform_path)

In [2]:
#!pip install pyarrow

In [3]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, progress

-------------------------
### Read data into a dataframe

In [4]:
df_raw = pd.read_csv(os.path.join(platform_path, 'DATA/share_price.csv.gz'), compression='gzip')

-------------------------
### Check the shape and head of the dataframe

In [5]:
df_raw.shape

(1000000, 11)

In [6]:
df_raw.head()

Unnamed: 0,ticker,date,open,high,low,close,volume,dividends,closeunadj,lastupdated,dimension
0,A,2021-02-05,124.51,125.96,123.11,123.18,1919704.0,0.0,123.18,2021-02-05,D
1,AA,2021-02-05,20.58,20.98,20.22,20.95,5235006.0,0.0,20.95,2021-02-05,D
2,AAC.U,2021-02-05,10.7,10.75,10.55,10.55,528129.0,0.0,10.55,2021-02-05,D
3,AACG,2021-02-05,10.02,10.12,5.6,6.33,18333641.0,0.0,6.33,2021-02-05,D
4,AACQ,2021-02-05,11.232,11.74,11.19,11.3,1053149.0,0.0,11.3,2021-02-05,D


-------------------------
### Check and convert column datatypes

In [7]:
df_raw.dtypes

ticker          object
date            object
open           float64
high           float64
low            float64
close          float64
volume         float64
dividends      float64
closeunadj     float64
lastupdated     object
dimension       object
dtype: object

In [8]:
dic_dtypes_sharadar_sep = {
    'ticker': 'object',
    'dimension': 'object',
    'date': 'datetime64',
    'open': 'float64',
    'high': 'float64',
    'low': 'float64',
    'closeunadj': 'float64',
    'close': 'float64',
    'volume': 'float64',
    'dividends': 'float64',
    'lastupdated': 'datetime64',
}

dic_dtypes = dic_dtypes_sharadar_sep

df = df_raw

for col, type in dic_dtypes.items():
    if type == 'int64':
        df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
    elif type == 'float64':
        df[col] = pd.to_numeric(df[col], errors='coerce')
    elif type == 'datetime64':
        df[col] = pd.to_datetime(df[col], errors='coerce')

-------------------------
### Sort values and drop any duplicates

In [10]:
unique_cols = ['ticker', 'dimension', 'date']

df = df.sort_values(by=unique_cols)
df = df.dropna(subset=unique_cols, how='any')
df = df.drop_duplicates(subset=unique_cols, keep='first')

df = df.reset_index(drop=True)

-------------------------
### Split data into chunks and write into parquet form

In [11]:
df = df.set_index('ticker', drop=False)

chunks = 6

ls_tickers = df['ticker'].unique()
ticker_chunks = np.array_split(ls_tickers, chunks)

for i, c in enumerate(ticker_chunks):
    print('INFO: writing chunk {}'.format(i))

    df[df['ticker'].isin(c)].to_parquet(os.path.join(platform_path, 'DATA/share_prices_in/' + '{}.parq'.format(i)),
            index=True, engine='pyarrow', partition_cols=['ticker'])

INFO: writing chunk 0
INFO: writing chunk 1
INFO: writing chunk 2
INFO: writing chunk 3
INFO: writing chunk 4
INFO: writing chunk 5


-------------------------
### Resample daily chunk data into weekly frequency and write back to parquet form

In [12]:
dimension = "W"

agg_dict = {'open': 'first',
            'high': 'max',
            'low': 'min',
            'closeunadj': 'last',
            'close': 'last',
            'volume': 'sum',
            'dividends': 'sum',
            'lastupdated': 'last'}

ticker_dict = {'ticker': 'last'}

def dim_red(df):
    return df.set_index('date').resample(dimension).agg({**agg_dict, **ticker_dict})


for i, parq in enumerate(sorted(os.listdir(platform_path + 'DATA/share_prices_in/'))):
    print('INFO: processing chunk {}'.format(i))

    df = dd.read_parquet(os.path.join(platform_path, 'DATA/share_prices_in/' + '{}.parq'.format(i)),
                         columns=['date', 'open', 'high', 'low', 'closeunadj',
                                  'close', 'volume', 'dividends', 'lastupdated'],
                         engine='pyarrow')

    df = df.reset_index()
    df = df.map_partitions(lambda df: dim_red(df)).compute()
    df = df.reset_index()
    
    df.to_parquet(os.path.join(platform_path, 'DATA/share_prices_out/' + '{}.parq'.format(i)),
            index=True, engine='pyarrow', partition_cols=['ticker'])

INFO: processing chunk 0
INFO: processing chunk 1
INFO: processing chunk 2
INFO: processing chunk 3
INFO: processing chunk 4
INFO: processing chunk 5
