# Parallelization with Dask
I'm going to revisit the FMA music database and see if I can run regressions faster with parallel computation on AWS.

In [1]:
# Incantations
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
pd.set_option('display.max_columns', None)  # Unlimited columns
%matplotlib inline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

## Data cleanup

In [2]:
%%time
# The first line contains the names for all the columns except for the very first one, which is somehow alone
# in the second row.

# Take columns names from first row
tracks = pd.read_csv('tracks.csv', header=1)

# Manually name the first column, and remove the first line where that name used to be.
tracks.rename(columns={'Unnamed: 0':'track_id'}, inplace=True)
tracks.drop(index=0, inplace=True)
tracks.head()

# Put the genre_top column at the beginning
tracks = tracks.reindex(columns=(['genre_top'] + list([a for a in tracks.columns if a != 'genre_top'])))

# Remove any columns that don't have an entry for genre_top
tracks = tracks.dropna(subset=['genre_top'])

garbage_columns = ['track_id','id', 'information','comments.1','title','bio',
                   'members','website','wikipedia_page','split','subset',
                   'comments.2','genres','genres_all','information.1','license','title.1']

tracks2 = tracks.drop(columns=garbage_columns)
tracks_numeric = tracks2.select_dtypes('number')
tracks_numeric = tracks_numeric.dropna(axis='columns')



CPU times: user 2.27 s, sys: 226 ms, total: 2.5 s
Wall time: 3.37 s


 I tried repeating all these cleanup steps with a Dask DataFrame instead of Pandas, but it turns out that some of those operations are not in the Dask subset of pandas operations.  Lesson learned.

In [3]:
%%time
y = tracks2['genre_top']
X = tracks_numeric

# Split into train and test groups
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.5,
                                                   random_state=42)

CPU times: user 12.3 ms, sys: 21 µs, total: 12.3 ms
Wall time: 11.8 ms


## Gridsearch with Dask

In [14]:
from sklearn.grid_search import GridSearchCV
import dask_searchcv as dcv

In [15]:
param_space = {'C':[1, 10, 100]}

model = LogisticRegression(multi_class='ovr',
                            solver='liblinear',
                            max_iter=500)

search_dask = dcv.GridSearchCV(model, param_space, cv=3)
search_sklearn = GridSearchCV(model, param_space, cv=3)

In [13]:
%%time
# Executing a grid search with the original sklearn function
search_sklearn.fit(X, y)

CPU times: user 1min 28s, sys: 0 ns, total: 1min 28s
Wall time: 1min 28s


GridSearchCV(cv=3, error_score='raise',
       estimator=LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=500, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False),
       fit_params={}, iid=True, n_jobs=1, param_grid={'C': [1, 10, 100]},
       pre_dispatch='2*n_jobs', refit=True, scoring=None, verbose=0)

In [16]:
%%time
# Executing a grid search with the new Dask function
search_dask.fit(X, y)

CPU times: user 2min 6s, sys: 75.9 ms, total: 2min 6s
Wall time: 30.5 s


GridSearchCV(cache_cv=True, cv=3, error_score='raise',
       estimator=LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,
          intercept_scaling=1, max_iter=500, multi_class='ovr', n_jobs=1,
          penalty='l2', random_state=None, solver='liblinear', tol=0.0001,
          verbose=0, warm_start=False),
       iid=True, n_jobs=-1, param_grid={'C': [1, 10, 100]}, refit=True,
       return_train_score='warn', scheduler=None, scoring=None)

Looks like parallelizing our computation brought the time down from 88 seconds to 30s.  Not bad!