In [1]:
import pyspark
import numpy as np
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import functions, types
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = (pyspark.sql.SparkSession.builder 
  .master("local[*]")
  .getOrCreate())

In [152]:
path_users = 'data/users.dat'
users = (spark.read.load("data/users.dat",
                         format="csv", sep=":", inferSchema="true")
                        .drop('_c1', '_c3', '_c5', '_c7'))

In [153]:
users = (users.withColumnRenamed(users.schema.names[0], 'userID')
                        .withColumnRenamed(users.schema.names[1], 'gender')
                        .withColumnRenamed(users.schema.names[2], 'age')
                        .withColumnRenamed(users.schema.names[3], 'occupation')
                        .withColumnRenamed(users.schema.names[4], 'zip'))

In [154]:
users.schema.names

['userID', 'gender', 'age', 'occupation', 'zip']

In [155]:
print((users.count(), len(users.columns)))

(6040, 5)


In [156]:
users.show(5)

+------+------+---+----------+-----+
|userID|gender|age|occupation|  zip|
+------+------+---+----------+-----+
|     1|     F|  1|        10|48067|
|     2|     M| 56|        16|70072|
|     3|     M| 25|        15|55117|
|     4|     M| 45|         7|02460|
|     5|     M| 25|        20|55455|
+------+------+---+----------+-----+
only showing top 5 rows



In [157]:
# read in the dataset into pyspark DataFrame
path_ratings = 'data/ratings.json'
ratings = spark.read.json(path_ratings)

In [158]:
ratings.show(5)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|     858|     4|9.56678732E8|   6040|
|    2384|     4|9.56678754E8|   6040|
|     593|     5|9.56678754E8|   6040|
|    1961|     4|9.56678777E8|   6040|
|    1419|     3|9.56678856E8|   6040|
+--------+------+------------+-------+
only showing top 5 rows



In [159]:
print((ratings.count(), len(ratings.columns)))

(719949, 4)


In [160]:
ratings.schema.names

['movie_id', 'rating', 'timestamp', 'user_id']

In [161]:
ratings.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- user_id: long (nullable = true)



In [162]:
ratings.show(5)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|     858|     4|9.56678732E8|   6040|
|    2384|     4|9.56678754E8|   6040|
|     593|     5|9.56678754E8|   6040|
|    1961|     4|9.56678777E8|   6040|
|    1419|     3|9.56678856E8|   6040|
+--------+------+------------+-------+
only showing top 5 rows



In [163]:
ratings.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: double, user_id: bigint]

#### inspect requests dataset

In [240]:
requests = spark.read.json('data/requests.json')
requests.persist()

DataFrame[movie_id: bigint, rating: double, timestamp: double, user_id: bigint]

In [241]:
requests.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- user_id: long (nullable = true)



In [242]:
requests.show(5)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|    2019|   NaN|9.56678777E8|   6040|
|     759|   NaN|9.56679248E8|   6040|
|    2858|   NaN|9.56679275E8|   6040|
|     246|   NaN|9.56679413E8|   6040|
|    1617|   NaN|9.56679473E8|   6040|
+--------+------+------------+-------+
only showing top 5 rows



In [258]:
print((requests.count(), len(requests.columns)))

(280260, 4)


#### inspect users dataset

In [167]:
schema_users = StructType(
    [
        StructField('userId', IntegerType()),
        StructField('gender', IntegerType()),
        StructField('age', FloatType()),
        StructField('occupation', LongType()),
        StructField('zip', StringType())
        
    ]
)

NameError: name 'StructType' is not defined

### Model

In [168]:
# convert format of datetime column 'timestamp' from epoch to standard 
ratings = (ratings.withColumn('timestamp',
                    functions.date_format(ratings.timestamp.cast(dataType=types.TimestampType()),
                    "yyyy-MM-dd HH:mm:ss")))

In [169]:
ratings.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [170]:
ratings.show(5)

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|     858|     4|2000-04-25 09:05:32|   6040|
|    2384|     4|2000-04-25 09:05:54|   6040|
|     593|     5|2000-04-25 09:05:54|   6040|
|    1961|     4|2000-04-25 09:06:17|   6040|
|    1419|     3|2000-04-25 09:07:36|   6040|
+--------+------+-------------------+-------+
only showing top 5 rows



In [171]:
ratings = ratings.sort(ratings.timestamp.asc())

In [172]:
ratings.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [173]:
ratings.show(5)

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|     858|     4|2000-04-25 09:05:32|   6040|
|    2384|     4|2000-04-25 09:05:54|   6040|
|     593|     5|2000-04-25 09:05:54|   6040|
|    1961|     4|2000-04-25 09:06:17|   6040|
|    1419|     3|2000-04-25 09:07:36|   6040|
+--------+------+-------------------+-------+
only showing top 5 rows



In [174]:
print((ratings.count(), len(users.columns)))

(719949, 5)


In [175]:
719949*.8

575959.2000000001

In [176]:
719949 *.2

143989.80000000002

In [178]:
# Sort by index and get first 4000 rows
ratings_train = ratings.sort(ratings.timestamp.asc()).limit(575959)

In [179]:
ratings_train.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [180]:
ratings_train.show(5)

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|     858|     4|2000-04-25 09:05:32|   6040|
|    2384|     4|2000-04-25 09:05:54|   6040|
|     593|     5|2000-04-25 09:05:54|   6040|
|    1961|     4|2000-04-25 09:06:17|   6040|
|    1419|     3|2000-04-25 09:07:36|   6040|
+--------+------+-------------------+-------+
only showing top 5 rows



In [181]:
print((ratings_train.count(), len(ratings_train.columns)))

(575959, 4)


In [182]:
ratings_test = ratings.subtract(ratings_train)

In [184]:
ratings_test.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [185]:
print((ratings_test.count(), len(ratings_test.columns)))

(143990, 4)


In [32]:
als = ALS(
    rank=10,
    maxIter=10,
    userCol='user_id',
    itemCol='movie_id',
    ratingCol='rating',
)

In [186]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

# fit the ALS model to the training set

als_model = als.fit(ratings_train)

In [191]:
# evaluate it to determine just how well it performed.
# generate predictions with your model for the test set by using the transform method on your ALS model
# evaluate your model and print out the RMSE from your test set
# ALSModel.transform?
predictions = als_model.transform(ratings_train)
predictions.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint, prediction: float]

In [197]:
ratings_train.sort(ratings_train.timestamp.asc()).show(10)

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|     858|     4|2000-04-25 09:05:32|   6040|
|     593|     5|2000-04-25 09:05:54|   6040|
|    2384|     4|2000-04-25 09:05:54|   6040|
|    1961|     4|2000-04-25 09:06:17|   6040|
|     213|     5|2000-04-25 09:07:36|   6040|
|    3505|     4|2000-04-25 09:07:36|   6040|
|    1419|     3|2000-04-25 09:07:36|   6040|
|    3111|     5|2000-04-25 09:07:36|   6040|
|     573|     4|2000-04-25 09:07:36|   6040|
|    1734|     2|2000-04-25 09:08:01|   6040|
+--------+------+-------------------+-------+
only showing top 10 rows



In [198]:
predictions.sort(predictions.timestamp.asc()).show(10)

+--------+------+-------------------+-------+----------+
|movie_id|rating|          timestamp|user_id|prediction|
+--------+------+-------------------+-------+----------+
|     858|     4|2000-04-25 09:05:32|   6040| 4.1999373|
|    2384|     4|2000-04-25 09:05:54|   6040| 3.2600207|
|     593|     5|2000-04-25 09:05:54|   6040| 3.7471366|
|    1961|     4|2000-04-25 09:06:17|   6040| 3.2740462|
|     573|     4|2000-04-25 09:07:36|   6040| 3.0070095|
|    3111|     5|2000-04-25 09:07:36|   6040|   3.25006|
|    3505|     4|2000-04-25 09:07:36|   6040|  3.256259|
|     213|     5|2000-04-25 09:07:36|   6040| 3.8371167|
|    1419|     3|2000-04-25 09:07:36|   6040| 3.3362842|
|    1734|     2|2000-04-25 09:08:01|   6040| 3.2702577|
+--------+------+-------------------+-------+----------+
only showing top 10 rows



In [199]:
user_factors = als_model.userFactors

In [238]:
user_factors.sort(user_factors.id.asc()).show(10)

+----+--------------------+
|  id|            features|
+----+--------------------+
|1570|[-0.0035724486, 0...|
|1571|[0.26768214, 0.55...|
|1572|[-0.01236396, 0.0...|
|1573|[-0.030469889, 0....|
|1574|[1.0296766, 1.317...|
|1575|[0.61713946, 0.69...|
|1576|[-0.4837431, 1.35...|
|1577|[0.646382, 0.5142...|
|1578|[-0.26934254, 0.4...|
|1579|[0.055627476, 0.1...|
+----+--------------------+
only showing top 10 rows



In [200]:
user_factors.show(10)

+----+--------------------+
|  id|            features|
+----+--------------------+
|1570|[-0.0035724486, 0...|
|1580|[0.37693256, 0.41...|
|1590|[-0.2613688, -0.0...|
|1600|[0.06716574, 0.60...|
|1610|[0.19097896, 0.25...|
|1620|[-0.011128309, -0...|
|1630|[0.022146514, 0.3...|
|1640|[-0.009884676, 0....|
|1650|[-0.011244012, 0....|
|1660|[0.15514812, 0.29...|
+----+--------------------+
only showing top 10 rows



In [201]:
print((user_factors.count(), len(user_factors.columns)))

(4464, 2)


In [204]:
item_factors = als_model.itemFactors

In [207]:
item_factors.show(10)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.20901716, 0.16...|
| 20|[0.21886238, 0.16...|
| 30|[0.027615188, -0....|
| 40|[0.2723992, 0.165...|
| 50|[-0.2152021, 0.11...|
| 60|[0.36570826, 0.40...|
| 70|[0.4447271, 0.079...|
| 80|[0.2608477, -0.15...|
| 90|[0.16204856, 0.35...|
|100|[0.32400677, 0.00...|
+---+--------------------+
only showing top 10 rows



In [205]:
print((item_factors.count(), len(item_factors.columns)))

(3576, 2)


In [208]:
user_1570_row = user_factors[user_factors['id'] == 1570].first()

In [212]:
# note - Alex and I got significantly different values for the below
user_1570_row

Row(id=1570, features=[-0.0035724486224353313, 0.02424660138785839, -1.0096547603607178, 0.5916305184364319, -0.9070777893066406, 0.9394926428794861, -0.7845749258995056, 1.0537766218185425, 1.216884732246399, -0.23996910452842712])

In [213]:
user_1570_factors = np.array(user_1570_row['features'])

In [214]:
user_1570_factors

array([-0.00357245,  0.0242466 , -1.00965476,  0.59163052, -0.90707779,
        0.93949264, -0.78457493,  1.05377662,  1.21688473, -0.2399691 ])

In [216]:
toy_story_row = item_factors[item_factors['id'] == 1].first()
toy_story_factors = np.array(toy_story_row['features'])

In [217]:
toy_story_row

Row(id=1, features=[-0.09004126489162445, 0.5152695178985596, -1.174152135848999, -0.14338530600070953, -0.29830971360206604, 0.4855971932411194, -0.33697500824928284, 0.39033833146095276, 1.0716793537139893, 0.02579941414296627])

In [218]:
toy_story_factors

array([-0.09004126,  0.51526952, -1.17415214, -0.14338531, -0.29830971,
        0.48559719, -0.33697501,  0.39033833,  1.07167935,  0.02579941])

In [None]:
user_1570_factors

In [None]:
m_factors

In [219]:
user_1570_factors @ toy_story_factors

3.813908210327995

In [None]:
# predictions.sort(predictions.timestamp.asc()).show(10)

In [None]:
# ratings.where(col('rating').isNull())

In [230]:
user_1570_preds = predictions[predictions['user_id'] == 1570]

In [231]:
user_1570_preds.sort('movie_id').show()

+--------+------+-------------------+-------+----------+
|movie_id|rating|          timestamp|user_id|prediction|
+--------+------+-------------------+-------+----------+
|     110|     3|2000-11-20 00:26:54|   1570| 3.2238398|
|     912|     5|2000-11-20 00:27:25|   1570| 4.5510798|
|    2473|     1|2000-11-20 00:27:25|   1570| 1.2502807|
|    2502|     4|2000-11-20 00:26:54|   1570| 3.5724447|
+--------+------+-------------------+-------+----------+



In [232]:
# produces 
recs = als_model.recommendForAllUsers(numItems=10)
recs.persist()

In [239]:
# returns list of lists
recs.sort(recs.user_id.asc()).show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   1570|[[2962, 5.3826337...|
|   1571|[[572, 5.652255],...|
|   1572|[[572, 4.6336207]...|
|   1573|[[2197, 5.5041585...|
|   1574|[[3915, 6.2564816...|
|   1575|[[2192, 5.682795]...|
|   1576|[[572, 5.237498],...|
|   1577|[[2197, 5.563936]...|
|   1578|[[3338, 5.2736936...|
|   1579|[[572, 4.6926217]...|
|   1580|[[2197, 4.4761944...|
|   1582|[[2332, 5.6327896...|
|   1583|[[3746, 5.185377]...|
|   1584|[[751, 4.6382904]...|
|   1585|[[3746, 4.655792]...|
|   1586|[[572, 5.7382555]...|
|   1587|[[572, 4.596559],...|
|   1588|[[572, 5.2303925]...|
|   1589|[[3338, 5.1366005...|
|   1590|[[3338, 5.178149]...|
+-------+--------------------+
only showing top 20 rows



In [None]:
# normalized or standard_scalar, row-wise, normalize per movie

In [None]:
recs[recs['userId']==10].first()['recommendations']

In [None]:
# !grep 3086 < data/movies.csv

In [249]:
preds_requests = als_model.transform(requests)

In [250]:
preds_requests.persist()

DataFrame[movie_id: bigint, rating: double, timestamp: double, user_id: bigint, prediction: float]

In [256]:
preds_requests.sort(preds_requests.timestamp.asc()).show(10)

+--------+------+------------+-------+----------+
|movie_id|rating|   timestamp|user_id|prediction|
+--------+------+------------+-------+----------+
|    2019|   NaN|9.56678777E8|   6040| 4.2546625|
|     759|   NaN|9.56679248E8|   6040|  4.025093|
|    2858|   NaN|9.56679275E8|   6040| 3.9846306|
|     246|   NaN|9.56679413E8|   6040| 3.9612033|
|    1617|   NaN|9.56679473E8|   6040|  3.914388|
|    2324|   NaN|9.56679629E8|   6040|  3.665716|
|    1089|   NaN|9.56679796E8|   6040| 3.6774757|
|    2804|   NaN|9.56680123E8|   6039|  3.951664|
|     933|   NaN| 9.5668027E8|   6039| 3.9608939|
|    1304|   NaN|9.56680308E8|   6039| 3.9800572|
+--------+------+------------+-------+----------+
only showing top 10 rows



In [257]:
print((preds_requests.count(), len(preds_requests.columns)))

(280260, 5)


In [259]:
# class pyspark.ml.evaluation.RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='rmse')d
# Note Experimental
# Evaluator for Regression, which expects two input columns: prediction and label.

evaluator = RegressionEvaluator(predictionCol="raw", metricName='rmse')
# scoreAndLabels = [(-28.98343821, -27.0), (20.21491975, 21.5),
#  (-25.98418959, -22.0), (30.69731842, 33.0), (74.69283752, 71.0)]
# dataset = spark.createDataFrame(scoreAndLabels, ["prediction", "rating"])
# 
# evaluator = RegressionEvaluator(predictionCol="raw")
# evaluator.evaluate(dataset)
# 2.842...
# evaluator.evaluate(dataset, {evaluator.metricName: "r2"})
# 0.993...
# evaluator.evaluate(dataset, {evaluator.metricName: "mae"})
# 2.649...
# re_path = temp_path + "/re"
# evaluator.save(re_path)
# evaluator2 = RegressionEvaluator.load(re_path)
# str(evaluator2.getPredictionCol())
# 'raw'

AttributeError: 'ALSModel' object has no attribute 'score'