<br><br>

## Handling Larger Datasets


In [None]:
from pathlib import Path
import sys
import time

import numpy as np
import pandas as pd
import random
import seaborn as sns
from sklearn.preprocessing import LabelBinarizer, StandardScaler

<br><br>
A profiling decorator.  Simply place the decorator above a function that takes a long time.  Then call the function.  Afterwards, examine the function's **time_results** attribute to see (in seconds) how long it took to run.

In [None]:
def profile(orig):
    def wrapper(*args, **kwargs):
        start = time.time()
        ret = orig(*args, **kwargs)
        finish = time.time()
        wrapper.time_results = (finish - start)
        return ret
    wrapper.time_results = 0.0
    return wrapper

<br><br>
A function to read (somewhat generically) from a file.  Since it is decorated, it can help us determine how long the function takes by checking **load_data.time_results**.

In [None]:
@profile
def load_data(filename, sep=',', names=None, usecols=None, header='infer', nrows=None, skiprows=None):
    data = pd.read_csv(filename, sep=sep, header=header, low_memory=False, 
                       names=names, usecols=usecols, nrows=nrows, skiprows=skiprows)
    return data

<br><br>
Our raw dataset loads into memory, but let's examine resources utilized...
<br><br>

In [None]:
filename = Path('helpfulness_reviews.csv')
if not filename.exists():
    filename = Path('helpfulness_reviews_smaller.csv')
    
if not filename.exists():
    print('File does not exist.  Can not continue.')

In [None]:
df = load_data(filename)
df.shape

In [None]:
load_data.time_results

In [None]:
df.head()

In [None]:
df.info()

In [None]:
!! python -V

<br><br><br>
#### **Use Caution with Various Pandas Operations**
Simple operations, such as simply renaming a column, can create entirely new copies of DataFrames, thus incurring large memory hits.

In [None]:
new_df = df.rename(columns={'ProductId': 'Product_ID'})
new_df.head()

In [None]:
df.head()

What we should have done was use **inplace=True** to avoid creating a new copy.  Notice below that df and new_df have different IDs.  This means they at different locations in memory.  Both occupy 40Mb of memory.  Both are globally defined and therefore will stay until the Python Virtual Machines shuts down.

In [None]:
id(df), id(new_df), sys.getsizeof(df), sys.getsizeof(new_df)

<br><br><br>
#### Limiting the Feature Set Read
With large datasets, you should limit the feature set to only columns needed (**usecols=()**)...

In [None]:
df = load_data(filename, usecols=(0, 1, 4, 5, 6))
df.shape

In [None]:
load_data.time_results

In [None]:
df.info()

This version consumes about half the resources and about half the time to load.

<br><br><br>
#### Limiting the Number of Rows Read
Through the use of **skiprows** and **nrows**, we can limit our reads also...


In [None]:
df = load_data(filename, usecols=(0, 1, 4, 5, 6), nrows=250000)
df.shape

In [None]:
load_data.time_results

In [None]:
df.info()

In [None]:
df.tail()

In [None]:
names = df.columns

In [None]:
df = load_data(filename, usecols=(0, 1, 4, 5, 6), names=names,
               nrows=250000, skiprows=250000, header=None)
df.shape

In [None]:
load_data.time_results

In [None]:
df.info()

In [None]:
df.head()

<br><br><br>
#### Downcasting
Downcasting can reduce the sizes of your in-memory data.

In [None]:
titanic = sns.load_dataset('titanic')
titanic.info()

In [None]:
titanic.dropna(subset=['embark_town'], inplace=True) 
titanic.age.fillna(titanic.age.median(), inplace=True)
titanic.drop(labels=['deck'], inplace=True, axis=1)
titanic.replace({'embarked': {'C': 0, 'Q': 1, 'S': 2},
                 'who': {'man': 0, 'woman': 1, 'child': 2}}, inplace=True)
titanic.embark_town = titanic.embark_town.astype('category')
titanic.embark_town = titanic.embark_town.cat.codes
titanic = pd.get_dummies(titanic, columns=['class'])
titanic.drop(labels=['alive', 'alone', 'adult_male', 'sex'], inplace=True, axis=1)
# lb = LabelBinarizer()
# pclass_encoded = pd.DataFrame(lb.fit_transform(titanic.pclass))
# titanic = pd.concat([titanic, pclass_encoded], axis=1, join='inner')
# titanic.drop('pclass', axis=1, inplace=True)
titanic.fare = StandardScaler().fit_transform(titanic.fare.to_numpy().reshape(-1, 1))
titanic.head()

In [None]:
titanic.age = titanic.age.astype(np.float16)
titanic.fare = titanic.fare.astype(np.float16)
titanic.survived = titanic.survived.astype(np.int8)
titanic.sibsp = titanic.sibsp.astype(np.int8)
titanic.parch = titanic.parch.astype(np.int8)
titanic.embarked = titanic.embarked.astype(np.int8)
titanic.who = titanic.who.astype(np.int8)

In [None]:
titanic.info()

Use category types for columns when those columns have a limited set of values.  Category types use dictionaries to map values and therefore consume less resources than a column containing many repeated strings.

<br><br><br>
#### Use Databases
Use SQL and the database to either stage data and/or help limit the data read

<br><br><br>
#### Use Sampling on Datasets
Below, we take a random sub-sample of our overall dataset.  We do this by first defining how big of a sample to take.  We take a 10% sample size.  sample (below) represents this actual number, which for our dataset is 56485 rows.  
We then defined rows to skip (randomly).  The syntax for random.sample(population, k) chooses k rows (randomly) from the entire population.  We then use this for the "skiprows" argument, thus skipping that many rows.

In [None]:
percent_sample_size = 10

for idx, line in enumerate(open(filename, encoding='utf-8')):       # Calculate number of rows in file
    pass
nrows = idx
sample = nrows // percent_sample_size                               # a 10% sample size, for us approx. 56485
skip = sorted(random.sample(range(1, nrows + 1), nrows - sample))   # defines (random) rows to be skipped  
df = pd.read_csv(filename, skiprows=skip)
df.shape

<br><br><br>
#### Load Data in Chunks
By loading our data in chunks, we won't be handling everything at once.  Instead, we'll be reading only parts of the data at a time, fitting a model from the partial data and saving that model.  This may lead us to create several models from the several chunks we may read in.  All of this is dependent on what model you are creating.  Some models support partial data available while others need the entire dataset.  

The following is valid code except we didn't define our features and label.

With this approach, you can work with larger data sizes than you have RAM.

<pre>
# use an algorithm of your choosing
from sklearn.linear_model import LogisticRegression       

chunksize = 100000                                      # how many rows to read at a time
models = []
for chunk in pd.read_csv(filename, chunksize=chunksize):
    # pre-process data
    model = LogisticRegression()
    model.fit(chunk[features], chunk[label])
    models.append(model)

predictions = mean([model.predict(features) for model in models], axis=0)
</pre>

The downside to this approach is that if a failure occurs, there is no failover and you must start all over.

<br><br><br>
#### Work with Dask
Another way of handling larger datasets, is by using Dask. Dask parallelizes Numpy and Pandas. Dask Dataframes implement many of the Pandas dataframes methods but Dask processes the data in parallel using multiple cores.  Dask DataFrame APIs are similar to Pandas DataFrames APIs.

Dask extends Python data analytics to around 100-500 Gb.  Beyond this, PySpark become the inevitable direction.
<br><br>
Dask creates a graph to support desired operations.  They are not executed immediately.  When it is time to perform the operations in the graph, the Dask task scheduler partially loads data and runs it through the graph into multiple cores.  Dask also combines the results back at the end and returns any final results.

The following loads multiple data files into a single dataframe of 5+ million records.

In [None]:
overall_time = 0
frames = []
for count in range(10):
    frames.append(load_data(filename))
    overall_time += load_data.time_results
df = pd.concat(frames, sort=False)
df.shape

In [None]:
print(overall_time, 'seconds')

In [None]:
df.info()

It occupies a significant chunk of memory.

While our decorator is cool, we can use a magic command to give us timing results too...

In [None]:
def add_one(df):
    df['HelpfulnessNumerator'] += 1

In [None]:
%%time

add_one(df)

In [None]:
def get_group_size(df, group_name):
    return df.groupby(group_name).size()

In [None]:
%%time

get_group_size(df, 'Score')

<br><br>

#### Using Dask...
<br><br>
Dask can assist in parallelization, thus making code run faster and even ease memory concerns.  It can run on a local machine or in a cluster of machines and can run multiple threads or processes.

Here, our function runs and (not surprisingly) it takes a while...

In [None]:
def func1(arg1, arg2):
    time.sleep(1)
    return arg1 + arg2

In [None]:
%%time
data = [1, 2, 3, 4, 5]
for val in data:
    print(func1(val, val-1))

<br><br>
Now, we'll start a Dask server (by default it will run locally).  

In [None]:
from dask import delayed, compute
from dask.distributed import Client

client = Client(n_workers=5)

<br><br>
Dask provides a module called dask.array that behaves like Numpy.

In [None]:
import dask.array as da
dask_array = da.arange(1, 10)
dask_array

In [None]:
dask_array2 = da.random.randint(low=1, high=10, size=1000000000)   # 1 billion element array
dask_array2 = dask_array2.reshape(1000000, 1000)
dask_array2

In [None]:
dask_array2.compute()

In [None]:
dask_array2.shape

In [None]:
%%time

result = delayed(func1)(5, 10)
result.compute()

The cell below runs instantly because it merely creates a graph to be run later.
<br><br>

In [None]:
%%time
data = [1, 2, 3, 4, 5]
results = []
delayed_func1 = delayed(func1)
for val in data:
    results.append(delayed_func1(val, val-1))

In [None]:
%%time

compute(results)

<br><br>
Dask provides a DataFrame that behaves like Pandas dataframe but operations can be scheduled and executed in parallel.

In [None]:
import dask.dataframe as dd

In [None]:
ddf = dd.read_csv([filename] * 10, blocksize=64000000)

In [None]:
ddf.info()

In [None]:
ddf.shape

In [None]:
compute(ddf.shape)

In [None]:
ddf.tail()

In [None]:
add_one(ddf)

In [None]:
ddf.map_partitions(lambda df: add_one(df))

A dask dataframe is broken into many Pandas dataframes.  These smaller dataframes are called partitions.  You don't control the partition sizes.  The function above is called for each partition.

In [None]:
ddf.memory_usage().sum().compute()

In [None]:
# ddf = client.persist(ddf)
result = ddf.groupby('Score').size().compute()

print(type(result))
result

In [None]:
client.close()
print('Client closed!')