# "Big Pandas" - Dask from the Inside
## Part 4 - Handling bigger data with Parquet
### PyData Berlin tutorial, 30 June 2017
### Stephen Simmons

In [None]:
# Complete set of Python 3.6 imports used for these examples

# Standard modules
import io
import logging
import lzma
import multiprocessing
import os
import ssl
import sys
import time
import urllib.request
import zipfile

# Third-party modules
import fastparquet      # Needs python-snappy and llvmlite
import graphviz         # To visualize Dask graphs 
import numpy as np
import pandas as pd
import psutil           # Memory stats
import dask
import dask.dataframe as dd
import bokeh.io         # For Dask profile graphs
import seaborn as sns   # For colormaps

# Support multiple lines of output in each cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Don't wrap tables
pd.options.display.max_rows = 20
pd.options.display.max_columns = 20
pd.options.display.width = 300

# Show matplotlib and bokeh graphs inline in Jupyter notebook
%matplotlib inline
bokeh.io.output_notebook()

print(sys.version)
np.__version__, pd.__version__, dask.__version__

# Loading a Dask Dataframe from multiple large csv files

In [None]:
# Set DIR_NAME to where original .xz flight data is stored. 
DIR_NAME = '/home/stephen/do-not-backup/data/usa-flights-otp'

In [None]:
def load_otp_csv(start='2015-12', end='2016-03'):
    """
    Dask DataFrame with BTS On-Time Performance flight data loaded from
    multiple compressed monthly csv archives named 'flights-yyyy-mm.xz'.
    
    (Note pandas 0.19.2 drops the 'end' month. Reported as bug in GH#15886.)
    """
    paths = [ os.path.join(DIR_NAME, 'flights-%s.xz' % dt.strftime('%Y-%m'))
              for dt in pd.date_range(start, end, freq='M')]

    cols = [
        'FlightDate', 'Origin', 'Dest', 'OriginState', 'DestState',
        'Carrier', 'FlightNum', 'TailNum', 'CRSDepTime', 'CRSArrTime',
        'DepDelay', 'ArrDelay', 'Flights', 'Cancelled', 'Diverted',
        ]

    ddf = dd.read_csv(paths,
                     dialect="excel",
                     header=0,
                     compression='xz',
                     usecols=cols,
                     encoding='latin-1',  # Avoid unicode errors
                     blocksize=None,      # Can't split compressed csv into blocks
                     parse_dates=['FlightDate'],
                     dtype={ 'FlightNum': str, }, # Don't want this as a number
                     )
    return ddf

In [None]:
%%time
ddf = load_otp_csv(start='2015-12', end='2016-03')

In [None]:
%%time
task = ddf[['Carrier','Flights','Cancelled']].groupby('Carrier').sum()

In [None]:
task

In [None]:
%%time
task.compute()

In [None]:
task.visualize()

# More complex example - more source files, with 'select' 

In [None]:
ddf = load_otp_csv(start='2016-01', end='2017-02')

In [None]:
task = ddf[ddf.FlightDate=='2016-01-24'][['Carrier','Flights','Cancelled']].groupby('Carrier').sum()

In [None]:
task

In [None]:
task.visualize()

In [None]:
ddf

In [None]:
ddf.npartitions
ddf.divisions

We haven't given Dask any clues to optimize the computation.
It doesn't know a priori that the source data is organized by month.

In [None]:
ddf2 = ddf.set_index('FlightDate')

In [None]:
ddf2.npartitions
ddf2.divisions

In [None]:
task2 = ddf2['2016-01-24'][['Carrier','Flights','Cancelled']].groupby('Carrier').sum()

In [None]:
task2.visualize()

In [None]:
task2.compute()    # Oops!

The problem here is Dask has assumed our partitioned data was completely sorted.

Remember slicing on labels can only return a single range if the column is monotonic.

We can make this happen by forcing a sort on each partition as it gets processed.

In [None]:
def sort_partition(df):
    return df.set_index(df.FlightDate).sort_index()

ddf3 = ddf.map_partitions(func=sort_partition)

In [None]:
ddf3
ddf3.divisions

In [None]:
task3 = ddf3['2016-01-24'][['Carrier','Flights','Cancelled']].groupby('Carrier').sum()

In [None]:
task3.visualize()

Here, even though we sort the partitions, the Dask graph is having to process everything.
We can help the graph optimizer by telling the top-level graph the Dask DataFrame's 
partitions are split on FlightDate.

In [None]:
ddf4 = ddf3.set_index('FlightDate')
ddf4
ddf4.divisions

Now the dependency graph looks very different .... the evaluator can jump directly 
to the correct monthly dataset.

In [None]:
task4 = ddf4['2016-01-24'][['Carrier','Flights','Cancelled']].groupby('Carrier').sum()
task4
task4.visualize()

In [None]:
task4.compute()

# Parquet - a faster storage format than csv

In [None]:
fastparquet.compression.compressions

In [None]:
ddf4.to_parquet('flights.parq', compression='SNAPPY')

What does this data look like....

In [None]:
import fastparquet
pf = fastparquet.ParquetFile('flights.parq')

In [None]:
pf.columns

In [None]:
pf.dtypes

In [None]:
%%time
ddf = dd.read_parquet('flights.parq', columns=['Carrier','Flights','Cancelled'])
x = (ddf.loc['2016-01-18':'2016-01-28']
         .reset_index()
         .groupby(by=['Carrier','FlightDate'])
         .sum()
    )
y = x['Cancelled']/x['Flights']*100
out = y.compute()

In [None]:
out

In [None]:
(out.unstack('FlightDate')
 .style
 .set_precision(2)
 .background_gradient(cmap=sns.light_palette("red", as_cmap=True), 
                      high=0.4, low=0.2, axis=1)
)

In [None]:
y.visualize()

# Visualizing Dask computations

In [None]:
import dask.diagnostics.profile as profile
from dask.diagnostics import ( ProgressBar, Profiler, 
                              ResourceProfiler, CacheProfiler )


In [None]:
ddf = dd.read_parquet('flights.parq')

sum_cols = ['Carrier', 'Flights', 'Cancelled', 'Diverted']
task = ddf[sum_cols].groupby('Carrier').sum()
task['CancelledPct'] = task['Cancelled'] / task['Flights'] * 100
task['DivertedPct'] = task['Diverted'] / task['Flights'] * 100

In [None]:
with ProgressBar():
    out = task.compute()

In [None]:
print(out)

In [None]:
task.visualize()

In [None]:
import dask.diagnostics
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from cachey import nbytes

with ProgressBar():
    with Profiler() as prof, \
            ResourceProfiler(dt=0.25) as rprof, \
                CacheProfiler(metric=nbytes) as cprof:
        df = task.compute()
dask.diagnostics.visualize([prof, rprof, cprof], save=False, show=True)

In [None]:
task

In [None]:
task.describe()

In [None]:
task._name

In [None]:
task._meta

In [None]:
from pprint import pprint

In [None]:
print('\n'.join(map(str,sorted(task.dask.keys()))))

In [None]:
task.dask[('mul-0437b7b743e7c3e1b3c755647213950d', 0)]

In [None]:
task.dask[('dataframe-groupby-sum-combine-b65452308fa37f9d7c2918b0ae6daf88', 1, 0, 0)]

In [None]:
ddf.Carrier.dask