* Getting started with dask
* Overview of dask features
* Data processing using dask dataframes
* Create dask dataframe using csv files
* Get the row and column count
* Overview of data processing APIs of dask dataframes
* Write data in dask dataframe to csv files
* Real world example of data processing using dask
* Exercise and Solution

In [None]:
# Getting started with dask
# python -m pip install dask\[complete\]

In [None]:
# Overview of dask features
# Scale PyData libraries such as numpy, pandas, scikit-learn, etc using Dask DataFrames
# Scale any Python code using Dask Futures

In [None]:
# Data processing using Dask Dataframes
# Read data from files and other sources using read apis
# Process data using Pandas Dataframe like APIs (query, apply, groupby, join, sort_values, etc)
# Write data to files and other targets using to apis

In [None]:
from dask import dataframe as dd

In [None]:
# dd.read_json
# df.to_json
df = dd.read_json(
    'data/retail_db_json/departments/*',
    lines=True
)

In [None]:
type(df)

In [None]:
# Create dask dataframe using csv files
df = dd.read_csv(
    'data/retail_db/departments/*',
    names=['department_id', 'department_name']
)

In [None]:
df.head()

In [None]:
df.shape

In [None]:
df.compute()

In [None]:
# Get the row and column count
df.compute().shape

In [None]:
type(df.compute())

In [None]:
# Overview of data processing APIs of dask dataframes
# query
# apply
# groupby
# join
# sort_values

In [None]:
df.compute()

In [None]:
df.query('department_id >= 3')

In [None]:
df.query('department_id >= 3').compute()

In [None]:
df.apply(lambda rec: rec['department_name'].upper(), axis=1)

In [None]:
df.apply(lambda rec: rec['department_name'].upper(), meta=(None, 'object'), axis=1).compute()

In [None]:
df.sort_values(by=['department_name'])

In [None]:
df.sort_values(by=['department_name']).compute()

In [None]:
# Write data in dask dataframe to csv files
df = dd.read_json(
    'data/retail_db_json/departments/*',
    lines=True
)

In [None]:
df.compute()

In [None]:
help(df.to_csv)

In [None]:
df.to_csv(
    'data/retail_db_csv/departments/part-*.csv', 
    index=False,
    name_function=lambda i: '%05d' % i
)

In [None]:
# Real world example of data processing using dask
# Convert all the files under retail_db to json format
import glob
import os
import json
import uuid
from dask import dataframe as dd

In [None]:
def get_columns(ds):
    with open('data/retail_db/schemas.json') as fp:
        schemas = json.load(fp)
    try:
        schema = schemas.get(ds)
        if not schema:
            raise KeyError
        cols = sorted(schema, key=lambda s: s['column_position'])
        columns = [col['column_name'] for col in cols]
        return columns
    except KeyError:
        print(f'Schema not found for {ds}')
        return

In [None]:
get_columns('departments')

In [None]:
get_columns('orders')

In [None]:
get_columns('dummy')

In [None]:
import uuid

os.makedirs('data/retail_demo', exist_ok=True)
for path in glob.glob('data/retail_db/*'):
    if os.path.isdir(path):
        ds = os.path.split(path)[1]
        for file in glob.glob(f'{path}/*'):
            df = pd.read_csv(file, names=get_columns(ds))
            os.makedirs(f'data/retail_demo/{ds}', exist_ok=True)
            df.to_json(
                f'data/retail_demo/{ds}/part-{str(uuid.uuid1())}.json',
                orient='records',
                lines=True
            )
            print(f'Number of records processed for {os.path.split(file)[1]} in {ds} is {df.shape[0]}')

In [None]:
for path in glob.glob('data/retail_db/*'):
    if os.path.isdir(path):
        ds = os.path.split(path)[1]
        df = dd.read_csv(
            f'{path}/*',
            names=get_columns(ds)
        )
        print(f'Number of records in {ds}: {df.compute().shape[0]}')



In [None]:
for path in glob.glob('data/retail_db/*'):
    if os.path.isdir(path):
        ds = os.path.split(path)[1]
        df = dd.read_csv(
            f'{path}/*',
            names=get_columns(ds)
        )
        df.to_json(
            f'data/retail_demo_json/{ds}/part-*.json',
            orient='records',
            lines=True
        )
        print(f'Number of records processed using {ds}: {df.compute().shape[0]}')

In [None]:
for path in glob.glob('data/retail_db/*'):
    if os.path.isdir(path):
        ds = os.path.split(path)[1]
        df = dd.read_csv(
            f'{path}/*',
            names=get_columns(ds)
        )
        df.to_json(
            f'data/retail_demo_json/{ds}/part-*.json',
            orient='records',
            lines=True,
            name_function=lambda i: '%05d' % i
        )
        print(f'Number of records processed using {ds}: {df.compute().shape[0]}')

In [None]:
for path in glob.glob('data/retail_db/*'):
    if os.path.isdir(path):
        ds = os.path.split(path)[1]
        df = dd.read_csv(
            f'{path}/*',
            names=get_columns(ds)
        )
        df.to_json(
            f'data/retail_demo_json/{ds}/part-*.json',
            orient='records',
            lines=True,
            name_function=lambda _: str(uuid.uuid1())
        )
        print(f'Number of records processed using {ds}: {df.compute().shape[0]}')

In [None]:
# Differences between Pandas and Dask
# Complexity: Relatively Easy
# Readability: Better readability
# Maintainability: Better maintainability
# Performance: 
  # Pandas for low volume or low size data
  # Dask for large volume or size data

* Exercise: Convert the text files under `data/nyse_all/nyse_data` to json. Use `dask` to take care of reading the data into the data frame and then writing the data to json format.
  * Source folder: `data/nyse_all/nyse_data`
  * Target folder: `data/nyse_all/nyse_json`
  * File Format: `gzip` compressed json format.
  * Column Names: `['ticker', 'trade_date', 'open_price', 'low_price', 'high_price', 'close_price', 'volume']`
  * Make sure file name is generated using part-nnnnn.json.gz (eg: `part-00000.json.gz`)
  * Validate by using shape on both source and target locations.
  * Monitor the overall execution time