# Spark
## Recommender system
### Code along

In [3]:
from tqdm.notebook import tqdm

import numpy as np
import pandas as pd

import findspark

from pyspark import SparkConf
from pyspark import SparkContext

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark.ml import Pipeline

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

from pyspark.ml.recommendation import ALS

from pyspark.ml.evaluation import RegressionEvaluator

from matplotlib import pyplot as plt
import seaborn as sns

import optuna

In [4]:
findspark.init()
findspark.find()

%matplotlib inline
sns.set_theme(style='darkgrid')
sns.set_context("notebook", rc={"lines.linewidth": 2.5})

In [5]:
random_seed = 1

In [6]:
conf = SparkConf() \
    .setAppName("recommender") \
    #.setMaster('local') \
    #.set('spark.executor.memory', '8g') \
    #.set('spark.driver.maxResultSize', '8g') \
    #.set("spark.memory.fraction", "0.6") \
    #.set("spark.memory.storageFraction", "0.5") \
    #.set("spark.sql.shuffle.partitions", "5") \
    #.set("spark.memory.offHeap.enabled", "false") \
    #.set("spark.reducer.maxSizeInFlight", "96m") \
    #.set("spark.shuffle.file.buffer", "256k") \
    #.set("spark.sql.debug.maxToStringFields", "100") \
    #.set('spark.sql.autoBroadcastJoinThreshold', '-1')

In [7]:
%%capture

spark = SparkSession.builder.config(conf=conf).getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/11/09 13:55:08 WARN Utils: Your hostname, Diego-desktop resolves to a loopback address: 127.0.1.1; using 172.27.76.109 instead (on interface eth0)
23/11/09 13:55:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/09 13:55:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
df = spark.read.csv('../data/movielens_ratings.csv', header=True, inferSchema=True)

                                                                                

In [10]:
df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [11]:
df.show(5)

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
+-------+------+------+
only showing top 5 rows



In [20]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [27]:
df.groupBy('userId').agg({'userId' : 'count'}).orderBy('count(userId)').show()

+------+-------------+
|userId|count(userId)|
+------+-------------+
|    10|           44|
|    16|           45|
|    27|           46|
|    17|           46|
|    25|           46|
|    29|           46|
|     2|           46|
|    20|           47|
|    13|           48|
|     3|           48|
|    15|           48|
|    21|           48|
|    26|           49|
|     1|           49|
|     5|           49|
|    19|           49|
|     8|           49|
|     0|           49|
|    28|           50|
|    23|           52|
+------+-------------+
only showing top 20 rows



In [16]:
df.groupBy('movieId').agg({'movieId' : 'count'}).orderBy('count(movieId)').show()

+-------+--------------+
|movieId|count(movieId)|
+-------+--------------+
|      8|             7|
|     42|             8|
|     93|            10|
|     41|            10|
|     46|            10|
|     16|            11|
|     34|            11|
|     76|            11|
|     65|            11|
|     80|            11|
|     89|            11|
|     53|            12|
|     28|            12|
|     57|            12|
|     32|            12|
|     75|            12|
|     11|            12|
|     74|            12|
|      3|            13|
|      1|            13|
+-------+--------------+
only showing top 20 rows



In [29]:
train, test = df.randomSplit([0.8, 0.2])

In [30]:
ALS?

[0;31mInit signature:[0m
[0mALS[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0;34m*[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mrank[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;36m10[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmaxIter[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;36m10[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mregParam[0m[0;34m:[0m [0mfloat[0m [0;34m=[0m [0;36m0.1[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mnumUserBlocks[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;36m10[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mnumItemBlocks[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;36m10[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mimplicitPrefs[0m[0;34m:[0m [0mbool[0m [0;34m=[0m [0;32mFalse[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0malpha[0m[0;34m:[0m [0mfloat[0m [0;34m=[0m [0;36m1.0[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0muserCol[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;34m'user'[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0m

In [34]:
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
)

In [35]:
model = als.fit(train)

In [36]:
predictions = model.transform(test)

In [38]:
predictions.describe().show()

23/11/09 14:58:59 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-----------------+------------------+------------------+------------------+
|summary|          movieId|            rating|            userId|        prediction|
+-------+-----------------+------------------+------------------+------------------+
|  count|              454|               454|               454|               454|
|   mean|48.37224669603524|1.6872246696035242|14.288546255506608|1.1732791665744544|
| stddev|28.56154387914279|1.1676147863777637| 8.481794611081227| 1.577261698652785|
|    min|                0|               1.0|                 0|        -3.3986764|
|    max|               99|               5.0|                29|          8.249246|
+-------+-----------------+------------------+------------------+------------------+



In [37]:
predictions.show()

+-------+------+------+------------+
|movieId|rating|userId|  prediction|
+-------+------+------+------------+
|      1|   1.0|    28| -0.78622735|
|      2|   4.0|    28|   -2.377852|
|      1|   1.0|    26| -0.10417879|
|      2|   1.0|    26|   4.5013847|
|      2|   2.0|     1|   1.8368177|
|      0|   1.0|    13|     0.61854|
|      0|   1.0|     6|  0.05217888|
|      1|   1.0|     3| -0.25462055|
|      2|   1.0|     3|  -2.5422375|
|      0|   1.0|    20|   1.4808191|
|      1|   1.0|    20|  -0.4175865|
|      2|   2.0|    20|   1.0088059|
|      0|   1.0|    19| -0.68485904|
|      1|   4.0|    15|   1.0826527|
|      3|   1.0|    17|   0.7915948|
|      1|   1.0|     4|   1.9669614|
|      3|   1.0|     7|   1.3515427|
|      0|   3.0|    10|-0.039607756|
|      3|   1.0|    21|  0.60002625|
|      1|   1.0|    14|  0.71497095|
+-------+------+------+------------+
only showing top 20 rows



In [39]:
evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)

In [41]:
rmse = evaluator.evaluate(predictions)
rmse

1.9160324823642627

In [44]:
single_user = test.filter(test['userId'] == 11).select(['movieId', 'userId'])
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|      9|    11|
|     16|    11|
|     20|    11|
|     22|    11|
|     23|    11|
|     25|    11|
|     27|    11|
|     36|    11|
|     39|    11|
|     47|    11|
|     62|    11|
|     64|    11|
|     69|    11|
|     71|    11|
|     75|    11|
|     76|    11|
|     81|    11|
|     82|    11|
|     86|    11|
|     97|    11|
+-------+------+



In [45]:
recommendations = model.transform(single_user)

In [46]:
recommendations.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     75|    11| 6.0656123|
|     69|    11|  4.651586|
|     36|    11| 3.8258572|
|     64|    11|  2.342993|
|     82|    11| 2.0634267|
|     22|    11| 2.0448046|
|     76|    11| 1.8864181|
|     71|    11|  1.804649|
|     23|    11| 1.2146819|
|     27|    11| 1.0672005|
|     81|    11| 1.0047067|
|     62|    11|0.42403275|
|     20|    11|0.32256064|
|     39|    11|0.26840913|
|     86|    11|0.17163298|
|     47|    11|-0.7469984|
|     97|    11|-1.1183798|
|     16|    11|-1.2208127|
|      9|    11| -2.372209|
|     25|    11|-2.4871178|
+-------+------+----------+

