In [0]:
display(dbutils.fs.ls("FileStore/tables/movielens-1.txt"))

path,name,size
dbfs:/FileStore/tables/movielens-1.txt,movielens-1.txt,29359


In [0]:
#reading file
file = 'dbfs:/FileStore/tables/movielens-1.txt'
df = spark \
  .read.format("txt") \
  .option("inferSchema", "True") \
  .option("header", "False") \
  .text(file)
#display(df.limit(20))
df.show(20)

+-----------------+
|            value|
+-----------------+
| 0:2:3:1424380312|
| 0:3:1:1424380312|
| 0:5:2:1424380312|
| 0:9:4:1424380312|
|0:11:1:1424380312|
|0:12:2:1424380312|
|0:15:1:1424380312|
|0:17:1:1424380312|
|0:19:1:1424380312|
|0:21:1:1424380312|
|0:23:1:1424380312|
|0:26:3:1424380312|
|0:27:1:1424380312|
|0:28:1:1424380312|
|0:29:1:1424380312|
|0:30:1:1424380312|
|0:31:1:1424380312|
|0:34:1:1424380312|
|0:37:1:1424380312|
|0:41:2:1424380312|
+-----------------+
only showing top 20 rows



In [0]:
#split column into multiple comuns
import pyspark.sql.functions as F
split_col = F.split(df.value, ':')
df = df.withColumn('userId', split_col.getItem(0))
df = df.withColumn('movieId', split_col.getItem(1))
df = df.withColumn('rating', split_col.getItem(2))
df = df.withColumn('timestamp', split_col.getItem(3))

#removing "value" column
df = df.drop('value')
#display(df.limit(10))
df.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|     3|1424380312|
|     0|      3|     1|1424380312|
|     0|      5|     2|1424380312|
|     0|      9|     4|1424380312|
|     0|     11|     1|1424380312|
|     0|     12|     2|1424380312|
|     0|     15|     1|1424380312|
|     0|     17|     1|1424380312|
|     0|     19|     1|1424380312|
|     0|     21|     1|1424380312|
+------+-------+------+----------+
only showing top 10 rows



In [0]:
#converting dataframe data to int
from pyspark.sql.types import IntegerType
df = df.withColumn("userId", df["userId"].cast(IntegerType()))
df = df.withColumn("movieId", df["movieId"].cast(IntegerType()))
df = df.withColumn("rating", df["rating"].cast(IntegerType()))
df = df.withColumn("timestamp", df["timestamp"].cast(IntegerType()))

df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [0]:
#LETS GET TO WORK

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

#Divide dataset in training and test
(training, test) = df.randomSplit([0.8, 0.2])

#Instance ALS model, setting maxIter, learning coefficient, used columns and not considering coldstart
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

#Train the dataset model using als.fit()
model = als.fit(training)

#Apply model on test set to make predictions
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Erro médio quadrático = " + str(rmse))


Erro médio quadrático = 1.8672471778820656


In [0]:
#Considering all users from dataset, we generate 10 reccomentations
userRec = model.recommendForAllUsers(10)
#display(userRec)
userRec.show(15)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{27, 5.5316653},...|
|    10|[{96, 4.1812196},...|
|     0|[{49, 3.7634022},...|
|     1|[{90, 4.607753}, ...|
|    21|[{29, 5.002292}, ...|
|    11|[{55, 5.742875}, ...|
|    12|[{49, 5.908823}, ...|
|    22|[{59, 5.112343}, ...|
|     2|[{90, 5.8634415},...|
|    13|[{93, 3.502222}, ...|
|     3|[{30, 5.3283367},...|
|    23|[{90, 6.2230487},...|
|     4|[{29, 4.240904}, ...|
|    24|[{69, 5.2349663},...|
|    14|[{52, 4.7690406},...|
+------+--------------------+
only showing top 15 rows



In [0]:
#Optionally, the rating matrix transpose was made to recommend potencial users for specific items
movieRecs = model.recommendForAllItems(10)
#display(movieRecs)
movieRecs.show(20)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{9, 3.2749887}, ...|
|     40|[{28, 5.130672}, ...|
|     10|[{17, 3.8834913},...|
|     50|[{23, 4.221957}, ...|
|     80|[{26, 4.0670457},...|
|     70|[{2, 4.3705144}, ...|
|     60|[{21, 3.3713374},...|
|     90|[{23, 5.8662024},...|
|     30|[{24, 5.1325297},...|
|      0|[{28, 2.9880567},...|
|     31|[{23, 4.0552073},...|
|     81|[{28, 4.967559}, ...|
|     91|[{12, 3.0997214},...|
|      1|[{15, 3.3060863},...|
|     41|[{14, 3.961575}, ...|
|     61|[{25, 3.256186}, ...|
|     51|[{3, 5.0282035}, ...|
|     21|[{22, 3.4255478},...|
|     11|[{2, 4.2585516}, ...|
|     71|[{25, 3.7496777},...|
+-------+--------------------+
only showing top 20 rows



In [0]:
#Movies recommended by users
UserRecsOnlyItemId = userRec.select(userRec['userId'],
                                   userRec['recommendations']['movieId'])
#display(UserRecsOnlyItemId)
UserRecsOnlyItemId.show(30)

userId,recommendations.movieId
20,"List(32, 22, 94, 77, 75, 17, 54, 51, 7, 90)"
10,"List(9, 2, 40, 47, 31, 85, 49, 25, 61, 42)"
0,"List(92, 81, 9, 40, 12, 2, 79, 26, 98, 49)"
1,"List(68, 62, 69, 30, 79, 9, 77, 89, 85, 59)"
21,"List(53, 29, 52, 25, 2, 74, 63, 87, 76, 60)"
11,"List(18, 27, 23, 69, 48, 30, 79, 34, 90, 13)"
12,"List(55, 49, 64, 35, 27, 46, 48, 50, 94, 16)"
22,"List(74, 75, 30, 51, 22, 77, 88, 54, 68, 32)"
2,"List(85, 69, 39, 93, 8, 33, 83, 37, 70, 11)"
13,"List(93, 70, 53, 30, 74, 29, 18, 83, 59, 60)"
