In [1]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:33017")
client

0,1
Client  Scheduler: tcp://127.0.0.1:33017  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.24 GB


In [2]:
import pandas as pd
import joblib
import gcsfs

#Dask
import dask.dataframe as dd
from dask_ml.cluster import SpectralClustering
from dask_ml.cluster import KMeans
from dask_ml.model_selection import train_test_split, GridSearchCV, IncrementalSearchCV

#Sklearn
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn import svm, linear_model, tree
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, AdaBoostRegressor

#Local Files
import src.features_engineering as fte
import src.clustering as cl
import src.supervised_learning as sl
import src.mongodb_database as mdb

In [3]:
ratings = dd.read_csv('./input/ratings_small.csv')
genres_dummies = pd.read_csv('./input/genres_dummies.csv')

### Features Engineering Pipeline

In [4]:
%%time
ratings = (ratings.pipe(fte.addUserFeatures)
           .pipe(fte.addMoviesFeatures)
           .pipe(fte.filterbyRatingsAmount, min_rt=100, max_rt=1000)
           .pipe(fte.addWeekdayColumns)
           .pipe(fte.addGenresDummies, genres_dummies=genres_dummies)
           #.pipe(fte.popularityNormalizer)
           #.pipe(fte.ratingsNormalizer)
          )

CPU times: user 358 ms, sys: 16.6 ms, total: 374 ms
Wall time: 1.3 s


We're assuming that the indexes of each dataframes are 
 aligned. This assumption is not generally safe.
  "Concatenating dataframes with unknown divisions.\n"


In [5]:
ratings.head()

Unnamed: 0,userId,movieId,GT,timestamp,user_rt_count,user_rt_mean,movie_rt_mean,popularity,weekday,weekday_6,...,Romance,Science Fiction,Sentai Filmworks,TV Movie,Telescene Film Group Productions,The Cartel,Thriller,Vision View Entertainment,War,Western
0,4,1371,4.0,949810302,204,4.348039,3.053191,47,6,1,...,0,0,0,0,0,0,0,0,0,0
1,19,1371,4.0,855193404,423,3.534279,3.053191,47,3,0,...,0,0,0,0,0,0,0,0,0,0
2,21,1371,3.0,853852263,162,3.506173,3.053191,47,1,0,...,0,0,0,0,0,0,0,0,0,0
3,22,1371,2.0,1131662302,220,3.275,3.053191,47,3,0,...,0,0,0,0,0,0,0,0,0,0
4,41,1371,3.5,1093886662,199,3.866834,3.053191,47,0,0,...,0,0,0,0,0,0,0,0,0,0


# __Clustering__

In [6]:
users_genres = cl.dataScaling(cl.userGenresMatrix(ratings, genres_dummies))
users_genres_da = users_genres.to_dask_array(lengths=True)
users_genres.head()

Unnamed: 0_level_0,Action,Adventure,Animation,Aniplex,BROSTA TV,Carousel Productions,Comedy,Crime,Documentary,Drama,...,Romance,Science Fiction,Sentai Filmworks,TV Movie,Telescene Film Group Productions,The Cartel,Thriller,Vision View Entertainment,War,Western
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
4,0.259259,0.183333,0.230769,0.0,0.0,0.0,0.211864,0.225352,0.076923,0.222222,...,0.266667,0.315789,0.0,0.0,0.0,0.0,0.254386,0.0,0.176471,0.0
8,0.061728,0.033333,0.076923,0.0,0.0,0.0,0.076271,0.028169,0.230769,0.051587,...,0.077778,0.052632,0.0,0.0,0.0,0.0,0.026316,0.0,0.0,0.090909
17,0.283951,0.35,0.307692,0.0,0.0,0.0,0.245763,0.366197,0.076923,0.27381,...,0.2,0.333333,0.0,0.5,0.0,0.0,0.324561,0.0,0.235294,0.545455
19,0.580247,0.6,0.538462,0.0,0.0,0.0,0.491525,0.690141,0.461538,0.527778,...,0.5,0.491228,0.0,0.0,0.0,0.0,0.552632,0.0,0.352941,0.272727
21,0.135802,0.133333,0.230769,0.0,0.0,0.0,0.110169,0.225352,0.153846,0.142857,...,0.177778,0.140351,0.0,0.0,0.0,0.0,0.140351,0.0,0.058824,0.090909


In [7]:
%%time
#Spectral Clustering
clusters_number = 4
spcl = SpectralClustering(n_clusters=clusters_number, affinity='polynomial', n_jobs=-1)
with joblib.parallel_backend('dask'):
    clusters = spcl.fit_predict(users_genres_da)
clusters_index = cl.getClustersIndex(clusters, users_genres)
#clusters_index.to_csv('./output/clusters-index/clusters-index-spcl-poly-4-*.csv')
clusters_index.compute()['cluster'].value_counts()

CPU times: user 32.2 s, sys: 2.63 s, total: 34.8 s
Wall time: 2min 20s


1    82
3    80
0    45
2    41
Name: cluster, dtype: int64

In [8]:
ratings = ratings.merge(clusters_index, left_on='userId', right_on='userId')

In [20]:
ratings.head()

Unnamed: 0,userId,movieId,GT,timestamp,user_rt_count,user_rt_mean,movie_rt_mean,popularity,weekday,weekday_6,...,Science Fiction,Sentai Filmworks,TV Movie,Telescene Film Group Productions,The Cartel,Thriller,Vision View Entertainment,War,Western,cluster
0,4,1371,4.0,949810302,204,4.348039,3.053191,47,6,1,...,0,0,0,0,0,0,0,0,0,0
1,4,2105,4.0,949896114,204,4.348039,3.478723,47,0,0,...,0,0,0,0,0,0,0,0,0,0
2,4,2193,3.0,949896070,204,4.348039,3.202381,42,0,0,...,0,0,0,0,0,0,0,0,0,0
3,4,153,4.0,949811346,204,4.348039,2.782946,129,6,1,...,0,0,0,0,0,0,0,0,0,0
4,4,185,3.0,949920047,204,4.348039,3.102941,102,0,0,...,1,0,0,0,0,0,0,0,0,0


# __Supervised Learning__

## __Full Dataset__

### X & y definition

In [19]:
X_columns = ['user_rt_mean', 'movie_rt_mean', 'popularity', 'weekday_6', 'weekday_2',
       'weekday_3', 'weekday_0', 'weekday_1', 'weekday_5', 'weekday_4',
       'Action', 'Adventure', 'Animation', 'Comedy', 'Crime', 'Documentary',
       'Drama', 'Family', 'Fantasy', 'Foreign', 'History', 'Horror', 'Music',
       'Mystery', 'Romance', 'Science Fiction', 'TV Movie', 'Thriller', 'War',
       'Western']
y_columns = ['GT']

In [20]:
X = ratings[X_columns].to_dask_array(lengths=True)
y = ratings[y_columns].to_dask_array(lengths=True)

In [21]:
X_train, X_test, y_train, y_test = train_test_split(X,y.ravel(),test_size=0.1)

### Model Selection

In [22]:
models = {
        "RandomForest": RandomForestRegressor(n_estimators=100, n_jobs=-1),
        "SGDRegressor": linear_model.SGDRegressor(max_iter=1000, tol=1e-3),
        "DecisionTree" : tree.DecisionTreeRegressor(random_state=0),
        "GradientBoostingRegressor": GradientBoostingRegressor(n_estimators=100),
        "AdaBoostRegressor" : AdaBoostRegressor(n_estimators=100)    
    }
with joblib.parallel_backend('dask'):
    %time _ = sl.mlmodelSelection(models, X_train, X_test, y_train, y_test)  


Training model: RandomForest
RSME 0.8479329595177765
MAE 0.648246526596348
r2_score 0.33111405080417056

Training model: SGDRegressor
RSME 344185168251.05835
MAE 245486061314.7889
r2_score -1.1020805625216284e+23

Training model: DecisionTree
RSME 1.1713700780779164
MAE 0.856328683225587
r2_score -0.2764896318001946

Training model: GradientBoostingRegressor
RSME 0.8091543322640741
MAE 0.6188557061628723
r2_score 0.39089555762352646

Training model: AdaBoostRegressor
RSME 0.8710922439973539
MAE 0.6906050929376095
r2_score 0.29407699796453357
CPU times: user 36.8 s, sys: 3.24 s, total: 40.1 s
Wall time: 3min 10s


In [None]:
#GradientBoostingRegressor
#Hyperparameters search
model = GradientBoostingRegressor()
params = {
    'loss' : ['ls', 'lad', 'huber', 'quantile'],
    'n_estimators' : [100, 400, 700],
    'max_depth' : [3, 10, 15]
}

with joblib.parallel_backend('dask'):
    search = GridSearchCV(model, params)
    search.fit(X_train, y_train)

In [None]:
pd.DataFrame(search.cv_results_)

In [None]:
#Final Training
with joblib.parallel_backend('dask'):
    gbr = GradientBoostingRegressor()
    gbr.fit(X_train, y_train)
    y_pred = gbr.predict(X_test)
    print("RSME", (mean_squared_error(y_test, y_pred)**0.5))
    print("MAE", mean_absolute_error(y_test, y_pred))
    print("r2_score", r2_score(y_test, y_pred))

In [34]:
#saving the model to a pickle
with open('./output/models/gbrdefaultpickle_file.joblib', 'wb') as gbr_file:  
    joblib.dump(gbr, gbr_file)

# Upload Users and Movies to MongoDB Database

In [9]:
%%time
#Uploading movies to MongoDB cluster.
movies_list = ratings['movieId'].unique().compute()
for movieid in movies_list:
    mdb.addMoviesBulk(ratings,users_genres, movieid)

  ["('concat-34402dbcff87286f7cb0a44256265b42', 0)", ...  x 33 columns]]
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


TypeError: cannot convert the series to <class 'float'>

In [None]:
%%time
# Uploading users to MongoDB Cluster
users_list = ratings['userId'].unique().compute()
for userid in users_list:
    mdb.addUsersBulk(ratings, userid)