In [62]:
import pyspark
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import functions, types
import numpy as np

In [2]:
spark = (pyspark.sql.SparkSession.builder
         .master("local[*]") #local[4]: run on my computer, use 4 processors, 
                             #local[*]: use a processor for each core
         .getOrCreate())

In [3]:
!ls data/

LICENSE             movies.dat          ratings.json        users.dat
README              [31mmovies_metadata.csv[m[m requests.json


In [4]:
!head data/movies.dat #3952 entries

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller


In [5]:
!head data/users.dat #UserID::Gender::Age::Occupation::Zip-code

1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
9::M::25::17::61614
10::F::35::1::95370


In [6]:
!head data/ratings.json

{"user_id": 6040, "movie_id": 858, "rating": 4, "timestamp": 956678732.0}
{"user_id": 6040, "movie_id": 2384, "rating": 4, "timestamp": 956678754.0}
{"user_id": 6040, "movie_id": 593, "rating": 5, "timestamp": 956678754.0}
{"user_id": 6040, "movie_id": 1961, "rating": 4, "timestamp": 956678777.0}
{"user_id": 6040, "movie_id": 1419, "rating": 3, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 213, "rating": 5, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 3111, "rating": 5, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 573, "rating": 4, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 3505, "rating": 4, "timestamp": 956678856.0}
{"user_id": 6040, "movie_id": 1734, "rating": 2, "timestamp": 956678881.0}


In [7]:
!head data/requests.json

{"user_id": 6040, "movie_id": 2019, "rating": NaN, "timestamp": 956678777.0}
{"user_id": 6040, "movie_id": 759, "rating": NaN, "timestamp": 956679248.0}
{"user_id": 6040, "movie_id": 2858, "rating": NaN, "timestamp": 956679275.0}
{"user_id": 6040, "movie_id": 246, "rating": NaN, "timestamp": 956679413.0}
{"user_id": 6040, "movie_id": 1617, "rating": NaN, "timestamp": 956679473.0}
{"user_id": 6040, "movie_id": 2324, "rating": NaN, "timestamp": 956679629.0}
{"user_id": 6040, "movie_id": 1089, "rating": NaN, "timestamp": 956679796.0}
{"user_id": 6039, "movie_id": 2804, "rating": NaN, "timestamp": 956680123.0}
{"user_id": 6039, "movie_id": 933, "rating": NaN, "timestamp": 956680270.0}
{"user_id": 6039, "movie_id": 1304, "rating": NaN, "timestamp": 956680308.0}


In [8]:
#load test data where ratings are NaN
requests = spark.read.json("data/requests.json")
requests.show(5)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|    2019|   NaN|9.56678777E8|   6040|
|     759|   NaN|9.56679248E8|   6040|
|    2858|   NaN|9.56679275E8|   6040|
|     246|   NaN|9.56679413E8|   6040|
|    1617|   NaN|9.56679473E8|   6040|
+--------+------+------------+-------+
only showing top 5 rows



In [9]:
from pyspark.sql.types import (
                                StructType,
                                StructField,
                                IntegerType,
                                FloatType,
                                LongType,)
dir(pyspark.sql.types)

['ArrayType',
 'AtomicType',
 'BinaryType',
 'BooleanType',
 'ByteType',
 'CloudPickleSerializer',
 'DataType',
 'DataTypeSingleton',
 'DateConverter',
 'DateType',
 'DatetimeConverter',
 'DecimalType',
 'DoubleType',
 'FloatType',
 'FractionalType',
 'IntegerType',
 'IntegralType',
 'JavaClass',
 'LongType',
 'MapType',
 'NullType',
 'NumericType',
 'Row',
 'ShortType',
 'SparkContext',
 'StringType',
 'StructField',
 'StructType',
 'TimestampType',
 'UserDefinedType',
 '_FIXED_DECIMAL',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__spec__',
 '_acceptable_types',
 '_all_atomic_types',
 '_all_complex_types',
 '_array_signed_int_typecode_ctype_mappings',
 '_array_type_mappings',
 '_array_unsigned_int_typecode_ctype_mappings',
 '_atomic_types',
 '_check_dataframe_convert_date',
 '_check_dataframe_localize_timestamps',
 '_check_series_convert_date',
 '_check_series_convert_timestamps_internal',
 '_check_series_convert_

In [13]:
users = (spark.read.load("data/users.dat",format="csv", sep=":", inferSchema=True
                       ).drop('_c1','_c3','_c5','_c7'))

In [14]:
users = (users.withColumnRenamed(users.schema.names[0],'userId'
                       ).withColumnRenamed(users.schema.names[1],'gender'
                        ).withColumnRenamed(users.schema.names[2],'age'
                        ).withColumnRenamed(users.schema.names[3],'occupation'
                        ).withColumnRenamed(users.schema.names[4],'zip'))
users.show(5)

+------+------+---+----------+-----+
|userId|gender|age|occupation|  zip|
+------+------+---+----------+-----+
|     1|     F|  1|        10|48067|
|     2|     M| 56|        16|70072|
|     3|     M| 25|        15|55117|
|     4|     M| 45|         7|02460|
|     5|     M| 25|        20|55455|
+------+------+---+----------+-----+
only showing top 5 rows



In [32]:
#load ratings data
ratings = spark.read.json("data/ratings.json")
ratings.show(20)

+--------+------+------------+-------+
|movie_id|rating|   timestamp|user_id|
+--------+------+------------+-------+
|     858|     4|9.56678732E8|   6040|
|    2384|     4|9.56678754E8|   6040|
|     593|     5|9.56678754E8|   6040|
|    1961|     4|9.56678777E8|   6040|
|    1419|     3|9.56678856E8|   6040|
|     213|     5|9.56678856E8|   6040|
|    3111|     5|9.56678856E8|   6040|
|     573|     4|9.56678856E8|   6040|
|    3505|     4|9.56678856E8|   6040|
|    1734|     2|9.56678881E8|   6040|
|    2503|     5|9.56678991E8|   6040|
|     919|     5|9.56678991E8|   6040|
|     912|     5|9.56678991E8|   6040|
|     527|     5|9.56679019E8|   6040|
|    1252|     5|9.56679057E8|   6040|
|     649|     5|9.56679057E8|   6040|
|     318|     4|9.56679057E8|   6040|
|    3289|     5|9.56679105E8|   6040|
|     608|     4|9.56679275E8|   6040|
|    2396|     3|9.56679275E8|   6040|
+--------+------+------------+-------+
only showing top 20 rows



In [33]:
ratings.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- user_id: long (nullable = true)



In [91]:
ratings.where(ratings["rating"].isNull()).show()

+--------+------+---------+-------+
|movie_id|rating|timestamp|user_id|
+--------+------+---------+-------+
+--------+------+---------+-------+



In [34]:
#Shape
print(ratings.count(), len(ratings.columns))

719949 4


In [35]:
#DEFAULTS:
#ALS(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, implicitPrefs=False, 
#alpha=1.0, userCol='user', itemCol='item', seed=None, ratingCol='rating', nonnegative=False, 
#checkpointInterval=10,intermediateStorageLevel='MEMORY_AND_DISK', 
#finalStorageLevel='MEMORY_AND_DISK', coldStartStrategy='nan')

#SPECIFY WHICH COLUMNS ARE USER, ITEM, and RATING
als = ALS(
        rank=10,
        maxIter=10,
        userCol='user_id',
        itemCol='movie_id',
        ratingCol='rating',
        )

In [36]:
#recast timestamp from unix epoch time to readable format
ratings = (ratings.withColumn('timestamp',
                             functions.date_format(
                                 ratings.timestamp.cast(dataType=types.TimestampType()),
                             "yyyy-MM-dd HH:mm:ss")))
ratings.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [38]:
ratings.show(20)

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|     858|     4|2000-04-25 09:05:32|   6040|
|    2384|     4|2000-04-25 09:05:54|   6040|
|     593|     5|2000-04-25 09:05:54|   6040|
|    1961|     4|2000-04-25 09:06:17|   6040|
|    1419|     3|2000-04-25 09:07:36|   6040|
|     213|     5|2000-04-25 09:07:36|   6040|
|    3111|     5|2000-04-25 09:07:36|   6040|
|     573|     4|2000-04-25 09:07:36|   6040|
|    3505|     4|2000-04-25 09:07:36|   6040|
|    1734|     2|2000-04-25 09:08:01|   6040|
|    2503|     5|2000-04-25 09:09:51|   6040|
|     919|     5|2000-04-25 09:09:51|   6040|
|     912|     5|2000-04-25 09:09:51|   6040|
|     527|     5|2000-04-25 09:10:19|   6040|
|    1252|     5|2000-04-25 09:10:57|   6040|
|     649|     5|2000-04-25 09:10:57|   6040|
|     318|     4|2000-04-25 09:10:57|   6040|
|    3289|     5|2000-04-25 09:11:45|   6040|
|     608|     4|2000-04-25 09:14:

In [41]:
ratings = ratings.orderBy('timestamp')
ratings.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [106]:
ratings.orderBy('user_id').show()

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|    1172|     5|2000-12-01 22:50:20|    635|
|    2686|     5|2000-12-01 22:46:54|    635|
|    1960|     5|2000-12-01 22:50:20|    635|
|    3948|     4|2000-12-01 22:44:54|    635|
|    3751|     5|2000-12-01 22:46:11|    635|
|    1279|     5|2000-12-01 22:48:40|    635|
|     296|     4|2000-12-01 22:50:20|    635|
|     920|     4|2000-12-01 22:38:31|    635|
|    1286|     4|2000-12-01 22:41:46|    635|
|    3720|     5|2000-12-01 22:45:38|    635|
|    3035|     5|2000-12-01 22:50:20|    635|
|    3916|     3|2000-12-01 22:47:20|    635|
|    3067|     4|2000-12-01 22:48:40|    635|
|    1217|     4|2000-12-01 22:48:40|    635|
|    3183|     4|2000-12-01 22:48:40|    635|
|    2294|     5|2000-12-01 22:37:41|    635|
|    3614|     4|2000-12-01 22:38:31|    635|
|    3528|     5|2000-12-01 22:41:46|    635|
|    1270|     4|2000-12-01 22:41:

In [50]:
ratings.show(5)

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|     858|     4|2000-04-25 09:05:32|   6040|
|    2384|     4|2000-04-25 09:05:54|   6040|
|     593|     5|2000-04-25 09:05:54|   6040|
|    1961|     4|2000-04-25 09:06:17|   6040|
|    1419|     3|2000-04-25 09:07:36|   6040|
+--------+------+-------------------+-------+
only showing top 5 rows



In [51]:
print((ratings.count(), len(ratings.columns)))

(719949, 4)


In [52]:
719949*0.8

575959.2000000001

In [55]:
ratings_train = ratings.sort(ratings.timestamp.asc()).limit(575959)
ratings_train.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [108]:
ratings_train.orderBy('user_id').show()

+--------+------+-------------------+-------+
|movie_id|rating|          timestamp|user_id|
+--------+------+-------------------+-------+
|     912|     5|2000-11-20 00:27:25|   1570|
|    2473|     1|2000-11-20 00:27:25|   1570|
|     110|     3|2000-11-20 00:26:54|   1570|
|    2502|     4|2000-11-20 00:26:54|   1570|
|    2410|     3|2000-11-20 00:24:18|   1571|
|    1210|     3|2000-11-20 00:24:18|   1571|
|     593|     5|2000-11-20 00:25:05|   1571|
|      34|     4|2000-11-20 00:25:05|   1571|
|    1198|     5|2000-11-20 00:25:05|   1571|
|      95|     3|2000-11-20 00:25:05|   1571|
|    3418|     5|2000-11-20 00:24:18|   1571|
|    1259|     4|2000-11-20 00:27:45|   1571|
|    1172|     5|2000-11-20 00:27:45|   1571|
|    1197|     4|2000-11-20 00:28:10|   1571|
|    2502|     2|2000-11-20 00:25:05|   1571|
|     195|     4|2000-11-20 00:25:05|   1571|
|    1270|     4|2000-11-20 00:27:45|   1571|
|    3424|     5|2000-11-20 00:27:45|   1571|
|    3361|     4|2000-11-20 00:27:

In [56]:
ratings_test = ratings.subtract(ratings_train)
ratings_test.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint]

In [57]:
print(f"Train DF Shape:{(ratings_train.count(), len(ratings_train.columns))}") #~80%
print(f"Test DF Shape:{(ratings_test.count(), len(ratings_test.columns))}") #~20%

Train DF Shape:(575959, 4)
Test DF Shape:(143990, 4)


In [58]:
als_model = als.fit(ratings_train)

In [59]:
predictions = als_model.transform(ratings_train)
predictions.persist()

DataFrame[movie_id: bigint, rating: bigint, timestamp: string, user_id: bigint, prediction: float]

In [61]:
predictions.sort(ratings.timestamp.asc()).show(20)

+--------+------+-------------------+-------+----------+
|movie_id|rating|          timestamp|user_id|prediction|
+--------+------+-------------------+-------+----------+
|     858|     4|2000-04-25 09:05:32|   6040|  4.221436|
|     593|     5|2000-04-25 09:05:54|   6040| 3.7021375|
|    2384|     4|2000-04-25 09:05:54|   6040|  3.347498|
|    1961|     4|2000-04-25 09:06:17|   6040| 3.3244977|
|     213|     5|2000-04-25 09:07:36|   6040| 3.9047341|
|    1419|     3|2000-04-25 09:07:36|   6040| 3.4164295|
|    3505|     4|2000-04-25 09:07:36|   6040| 3.2255454|
|    3111|     5|2000-04-25 09:07:36|   6040| 3.3304982|
|     573|     4|2000-04-25 09:07:36|   6040| 2.9886765|
|    1734|     2|2000-04-25 09:08:01|   6040|  3.291084|
|     912|     5|2000-04-25 09:09:51|   6040|  4.284761|
|    2503|     5|2000-04-25 09:09:51|   6040|  4.460045|
|     919|     5|2000-04-25 09:09:51|   6040| 3.7702155|
|     527|     5|2000-04-25 09:10:19|   6040|  3.985158|
|    1252|     5|2000-04-25 09:

In [83]:
predictions.dtypes

[('movie_id', 'bigint'),
 ('rating', 'bigint'),
 ('timestamp', 'string'),
 ('user_id', 'bigint'),
 ('prediction', 'float')]

In [63]:
user_factors = als_model.userFactors
user_factors.persist()

DataFrame[id: int, features: array<float>]

In [107]:
user_factors.orderBy('id').show(10)

+----+--------------------+
|  id|            features|
+----+--------------------+
|1570|[-0.56166136, -0....|
|1571|[-0.85663205, 0.0...|
|1572|[-0.93184286, 0.3...|
|1573|[-0.13954364, 0.4...|
|1574|[-2.4583068, 0.84...|
|1575|[-1.2614186, -0.0...|
|1576|[-1.3855032, 0.91...|
|1577|[-0.7750076, 0.07...|
|1578|[-0.5048642, 0.98...|
|1579|[-0.61297446, 0.2...|
+----+--------------------+
only showing top 10 rows



In [67]:
print(f"User factors shape:{user_factors.count(),len(user_factors.columns)}")

User factors shape:(4464, 2)


In [71]:
#first Users
!head -n 15 data/users.dat

1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
8::M::25::12::11413
9::M::25::17::61614
10::F::35::1::95370
11::F::25::1::04093
12::M::25::12::32793
13::M::45::1::93304
14::M::35::0::60126
15::M::25::7::22903


In [65]:
item_factors = als_model.itemFactors
item_factors.persist()

DataFrame[id: int, features: array<float>]

In [82]:
item_factors.sort(item_factors.id.asc()).show(20)

+---+--------------------+
| id|            features|
+---+--------------------+
|  1|[-0.78978753, 0.6...|
|  2|[-0.7596844, 0.30...|
|  3|[-0.7476781, 0.31...|
|  4|[-0.8348807, 0.07...|
|  5|[-0.7923992, 0.39...|
|  6|[-0.35641932, 0.2...|
|  7|[-0.8956462, 0.43...|
|  8|[-1.0695194, 0.06...|
|  9|[-0.037494857, 0....|
| 10|[-0.42211238, 0.5...|
| 11|[-0.60971487, 0.5...|
| 12|[-1.0284107, 0.13...|
| 13|[-0.83076036, -0....|
| 14|[-0.5794247, -0.0...|
| 15|[-0.6422041, -0.0...|
| 16|[-0.47119313, 0.0...|
| 17|[-0.8232782, 0.43...|
| 18|[-0.41385677, 0.3...|
| 19|[-0.22403774, 0.5...|
| 20|[-0.20548417, 0.2...|
+---+--------------------+
only showing top 20 rows



In [69]:
print(f"Item factors shape:{item_factors.count(),len(item_factors.columns)}")

Item factors shape:(3576, 2)


In [70]:
#First Items
!head -n 15 data/movies.dat

1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
4::Waiting to Exhale (1995)::Comedy|Drama
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children's
9::Sudden Death (1995)::Action
10::GoldenEye (1995)::Action|Adventure|Thriller
11::American President, The (1995)::Comedy|Drama|Romance
12::Dracula: Dead and Loving It (1995)::Comedy|Horror
13::Balto (1995)::Animation|Children's
14::Nixon (1995)::Drama
15::Cutthroat Island (1995)::Action|Adventure|Romance


In [80]:
user_1570_row = user_factors[user_factors['id'] == 1570].first()
user_1570_row

Row(id=1570, features=[-0.5616613626480103, -0.157181978225708, -0.7332372665405273, 0.1095866784453392, 0.2573540508747101, -1.257413387298584, 1.3740884065628052, 0.5832592248916626, 0.7984881401062012, -0.8602371215820312])

In [77]:
user_1570_factors = np.array(user_1570_row['features'])
user_1570_factors

array([-0.56166136, -0.15718198, -0.73323727,  0.10958668,  0.25735405,
       -1.25741339,  1.37408841,  0.58325922,  0.79848814, -0.86023712])

In [97]:
movie_110_row = item_factors[item_factors['id'] == 110].first()

In [98]:
movie_110_factors = np.array(movie_110_row['features'])

In [99]:
user_1570_factors @ movie_110_factors

3.2508053430178467

In [100]:
user_1750_preds = predictions[predictions['user_id'] == 1570]

In [101]:
user_1750_preds.sort('movie_id').show()

+--------+------+-------------------+-------+----------+
|movie_id|rating|          timestamp|user_id|prediction|
+--------+------+-------------------+-------+----------+
|     110|     3|2000-11-20 00:26:54|   1570| 3.2508054|
|     912|     5|2000-11-20 00:27:25|   1570| 4.5508895|
|    2473|     1|2000-11-20 00:27:25|   1570|  1.271153|
|    2502|     4|2000-11-20 00:26:54|   1570| 3.5577679|
+--------+------+-------------------+-------+----------+



In [102]:
recs = als_model.recommendForAllUsers(numItems=10)

In [103]:
recs.persist().show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   1580|[[3338, 4.5225854...|
|   4900|[[751, 5.576459],...|
|   5300|[[2309, 5.3587584...|
|   1591|[[572, 6.1320605]...|
|   4101|[[3523, 5.048921]...|
|   2122|[[572, 4.808478],...|
|   2142|[[572, 4.916417],...|
|   5803|[[3338, 5.0024242...|
|   3794|[[3338, 4.8726025...|
|   1645|[[2342, 5.370462]...|
|   3175|[[2192, 5.279993]...|
|   4935|[[3569, 4.692035]...|
|   2366|[[572, 4.666449],...|
|   2866|[[572, 4.463551],...|
|   5156|[[2192, 5.8179727...|
|   3997|[[572, 4.8999925]...|
|   3918|[[960, 4.993689],...|
|   4818|[[2192, 5.0776353...|
|   5518|[[572, 5.211287],...|
|   1829|[[1859, 4.7013426...|
+-------+--------------------+
only showing top 20 rows



In [104]:
recs[recs['user_id']==1570].first()['recommendations']

[Row(movie_id=2192, rating=5.679603099822998),
 Row(movie_id=2962, rating=5.572779655456543),
 Row(movie_id=751, rating=5.157539367675781),
 Row(movie_id=2342, rating=5.125278949737549),
 Row(movie_id=2931, rating=5.1087751388549805),
 Row(movie_id=771, rating=5.102655410766602),
 Row(movie_id=2309, rating=5.032965183258057),
 Row(movie_id=1567, rating=4.972918510437012),
 Row(movie_id=669, rating=4.939802646636963),
 Row(movie_id=668, rating=4.932856559753418)]

In [None]:
prediction['']