In [1]:
import pyspark
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
import tensorflow as tf

In [3]:
spark = pyspark.sql.SparkSession.builder.master("local").getOrCreate()

ratings = spark.read.json('data/ratings.json')
#requests = spark.read.json('data/requests.json')

In [4]:
ratings.show(10)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|     858|     4|9.56678732E8|   6040|
|    2384|     4|9.56678754E8|   6040|
|     593|     5|9.56678754E8|   6040|
|    1961|     4|9.56678777E8|   6040|
|    1419|     3|9.56678856E8|   6040|
|     213|     5|9.56678856E8|   6040|
|    3111|     5|9.56678856E8|   6040|
|     573|     4|9.56678856E8|   6040|
|    3505|     4|9.56678856E8|   6040|
|    1734|     2|9.56678881E8|   6040|
+--------+------+------------+-------+
only showing top 10 rows



In [5]:
users = spark.read.load("data/users.dat",
     format="csv", sep=":", inferSchema="true").drop('_c1', '_c3', '_c5', '_c7')

users = (users.withColumnRenamed(users.schema.names[0], 'user_id')
              .withColumnRenamed(users.schema.names[1], 'gender')
              .withColumnRenamed(users.schema.names[2], 'age')
              .withColumnRenamed(users.schema.names[3], 'occupation')
              .withColumnRenamed(users.schema.names[4], 'zip'))

In [6]:
users.show(10)

+-------+------+---+----------+-----+
|user_id|gender|age|occupation|  zip|
+-------+------+---+----------+-----+
|      1|     F|  1|        10|48067|
|      2|     M| 56|        16|70072|
|      3|     M| 25|        15|55117|
|      4|     M| 45|         7|02460|
|      5|     M| 25|        20|55455|
|      6|     F| 50|         9|55117|
|      7|     M| 35|         1|06810|
|      8|     M| 25|        12|11413|
|      9|     M| 25|        17|61614|
|     10|     F| 35|         1|95370|
+-------+------+---+----------+-----+
only showing top 10 rows



In [None]:
# Alternate strategy
# with open('data/users.dat') as f:
#     with open('data/users.csv', 'w') as f2:
#         f2.write('user_id, sex, age, occupation, zip\n')
#         for line in f:
#             f2.write(line.replace('::', ','))
            
# users = spark.read.csv('data/users.csv', header=True)
# users.show(10)

In [62]:
df_raw = users.join(ratings, 'user_id', 'inner').select('gender', 'age', 'occupation', 'zip', 'movie_id', 'rating').toPandas()
df = df_raw.copy()

In [63]:
n = 2
df['zip'] = df_raw['zip'].map(lambda x: str('0'*n)[len(x[:n]):] + x[:n] + 'x' * (5-n))
df['gender'] = df_raw['gender'].map({'M':0, 'F':1})
df['age'] = df_raw['age'].map({1:15, 18:21, 25:30, 35:40, 45:47, 50:53, 56:65})

ohe = OneHotEncoder(sparse=False)
ohe_m = OneHotEncoder(sparse=False)

np_ohe_zips = ohe.fit_transform(df[['zip']])
np_ohe_movies = ohe_m.fit_transform(df[['movie_id']])

In case you used a LabelEncoder before this OneHotEncoder to convert the categories to integers, then you can now use the OneHotEncoder directly.


In [None]:
np_df = np.array(df.drop(['zip', 'movie_id'], axis=1))
np_final = np.column_stack((np_df, np_ohe_zips, np_ohe_movies))
ohe_zips_labels = ['zip' + s[s.find('_'):] for s in ohe.get_feature_names()]
col_names = list(df.columns.drop(['zip', 'movie_id'])) + ohe_zips_labels + ohe_m.get_feature_names()
df_final = pd.DataFrame(np_final, columns=col_names)

In [58]:
X_train, X_test, y_train, y_test = train_test_split(
    df_final.drop('rating', axis=1), 
    df_final['rating'],
    train_size=.7
)

In [35]:
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, BaggingClassifier

In [51]:
forest = RandomForestClassifier(n_estimators=50, n_jobs=-1, min_samples_leaf=2)

In [52]:
forest.fit(X_train, y_train)

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
                       max_depth=None, max_features='auto', max_leaf_nodes=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=2, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, n_estimators=50, n_jobs=-1,
                       oob_score=False, random_state=None, verbose=0,
                       warm_start=False)

In [53]:
print(forest.score(X_train, y_train), 'is the training score')
forest.score(X_test, y_test)

0.7259077145317665 is the training score


0.3489609617951864

In [56]:
grad_boost = GradientBoostingClassifier(n_estimators=35)
grad_boost.fit(X_train, y_train)

GradientBoostingClassifier(criterion='friedman_mse', init=None,
                           learning_rate=0.1, loss='deviance', max_depth=3,
                           max_features=None, max_leaf_nodes=None,
                           min_impurity_decrease=0.0, min_impurity_split=None,
                           min_samples_leaf=1, min_samples_split=2,
                           min_weight_fraction_leaf=0.0, n_estimators=35,
                           n_iter_no_change=None, presort='auto',
                           random_state=None, subsample=1.0, tol=0.0001,
                           validation_fraction=0.1, verbose=0,
                           warm_start=False)

In [57]:
print(grad_boost.score(X_train, y_train), 'is the training score')
grad_boost.score(X_test, y_test)

0.3631274828457927 is the training score


0.3594215647691584