![Königsweg Logo](../img/koenigsweg_150.png)

<span style="font-size: small;float: right;">&copy; 2015-2020 Alexander C.S. Hendorf, <a href="http://koenigsweg.com">Königsweg GmbH</a>, Mannheim </span>

---

# Analytics with  Pandas and Jupyterlab

---

# Scaling and Optimizing Performance

---

In [3]:
import numpy as np
import pandas as pd
import fastparquet as parquet
import dask
%matplotlib inline
from datetime import datetime as dt

In [4]:
large_file_path = '../data/blooth_sales_data_big.json'  # 42 MB json

---

### Categorical

If you deal with table with a lot of repetive data, a Categorical can ge a good option to save space. It's basically a lookup table.

In [5]:
tiny_big_set = pd.read_json(large_file_path)

In [6]:
tiny_big_set.head(3)

Unnamed: 0,birthday,customer,name,orderdate,product,unitprice,units
0,1985-05-05,Data International,Marcela,2014-11-17 09:25:21.027327,Lipitor,10.36,14
1,1979-08-20,Alpha Studio LLC,Dara,2014-11-07 09:25:21.027364,Lipitor,10.54,1
2,1982-09-06,General Interactive Analysis Limited,Jc,2014-10-29 09:25:21.027394,Corolla,23554.09,4


In [7]:
tiny_big_set.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 260090 entries, 0 to 260089
Data columns (total 7 columns):
birthday     260090 non-null object
customer     260090 non-null object
name         260090 non-null object
orderdate    260090 non-null object
product      260090 non-null object
unitprice    260090 non-null float64
units        260090 non-null int64
dtypes: float64(1), int64(1), object(5)
memory usage: 13.9+ MB


In [8]:
tiny_big_set.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 260090 entries, 0 to 260089
Data columns (total 7 columns):
birthday     260090 non-null object
customer     260090 non-null object
name         260090 non-null object
orderdate    260090 non-null object
product      260090 non-null object
unitprice    260090 non-null float64
units        260090 non-null int64
dtypes: float64(1), int64(1), object(5)
memory usage: 94.0 MB


In [9]:
tiny_big_set.memory_usage()

Index             80
birthday     2080720
customer     2080720
name         2080720
orderdate    2080720
product      2080720
unitprice    2080720
units        2080720
dtype: int64

In [10]:
tiny_big_set.memory_usage(deep=True)

Index              80
birthday     17426030
customer     20771397
name         16357841
orderdate    21587470
product      18298210
unitprice     2080720
units         2080720
dtype: int64

In [11]:
tiny_big_set['product'] = tiny_big_set['product'].astype('category')

In [12]:
tiny_big_set.memory_usage(deep=True)

Index              80
birthday     17426030
customer     20771397
name         16357841
orderdate    21587470
product        261126
unitprice     2080720
units         2080720
dtype: int64

---

### Parquet

Apache Parquet is a
* free and open-source column-oriented data store of the Apache Hadoop ecosystem.
* top-level Apache Software Foundation (ASF)-sponsored project.
* built from the ground up with complex nested data structures in mind

Benefits:
* Column-wise compression is efficient and saves storage space
* Compression techniques specific to a type can be applied as the column values tend to be of the same type
* Queries that fetch specific column values need not read the entire row data thus improving performance
* Different encoding techniques can be applied to different columns
* can work with a number of programming languages like C++, Java, Python, PHP,…
* lower data storage costs and maximize effectiveness of querying data (e.g. with serverless technologies)


In [13]:
start = dt.utcnow()
df = pd.read_json(large_file_path)
took = start = dt.utcnow() - start
took.total_seconds()

1.388805

In [14]:
df.to_parquet(f'{large_file_path}.parquet.gzip', compression='gzip')

In [15]:
start = dt.utcnow()
df = pd.read_parquet(f'{large_file_path}.parquet.gzip')
took = start = dt.utcnow() - start
took.total_seconds()

0.325522

---

### Dask

#### Dask natively scales Python.

Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love as 
* Pandas
* Numpy
* Scikit-Learn

We can summarize the basics of Dask as follows:
* process data that doesn't fit into memory by breaking it into blocks and specifying task chains
* parallelize execution of tasks across cores and even nodes of a cluster
* move computation to the data rather than the other way around, to minimize communication overheads

In [16]:
# preprocessing articial data
df['total'] = df.units * df.unitprice
for i in range(5):
    df.to_csv(f'/tmp/data_for_dask_{i}.csv')

Pandas is great for tabular datasets that fit in memory. 
Dask becomes useful when the dataset you want to analyze is larger than your machine's RAM. 

The dask.dataframe module implements a blocked parallel DataFrame object that mimics a large subset of the Pandas DataFrame. One Dask DataFrame is comprised of many in-memory pandas DataFrames separated along the index. One operation on a Dask DataFrame triggers many pandas operations on the constituent pandas DataFrames in a way that is mindful of potential parallelism and memory constraints.

In [17]:
import dask
filename = f'/tmp/data_for_dask_*.csv'

In [18]:
import dask.dataframe as dd
df = dd.read_csv(filename)
# load and count number of rows
df.head()

Unnamed: 0.1,Unnamed: 0,birthday,customer,name,orderdate,product,unitprice,units,total
0,0,1985-05-05,Data International,Marcela,2014-11-17 09:25:21.027327,Lipitor,10.36,14,145.04
1,1,1979-08-20,Alpha Studio LLC,Dara,2014-11-07 09:25:21.027364,Lipitor,10.54,1,10.54
2,2,1982-09-06,General Interactive Analysis Limited,Jc,2014-10-29 09:25:21.027394,Corolla,23554.09,4,94216.36
3,3,1980-05-23,Power Source Studio Inc,Alfonzo,2014-11-02 09:25:21.027420,iPhone,506.18,30,15185.4
4,4,1973-07-09,Atlantic Electronics Organization,Clay,2014-11-18 09:25:21.027446,Lipitor,10.1,21,212.1


In [19]:
len(df)

1300450

##### **Pandas** way

In [20]:
start = dt.utcnow()

maxes = []
for fn in [f'/tmp/data_for_dask_{i}.csv' for i in range(5)]:
    pdf = pd.read_csv(fn)
    maxes.append(pdf.total.max())
    
took = start = dt.utcnow() - start
took.total_seconds(), max(maxes)

(2.515762, 249950.7)

**Dask** way

In [21]:
start = dt.utcnow()

df.total.max().compute()

took = start = dt.utcnow() - start
took.total_seconds(), max(maxes)

(2.231787, 249950.7)

![Dask](../img/dask-compute.gif)

---