In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=972474b632f7f9dc8052a2351991b273ec9c92c721902ce501124d4fb7430bd1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
from pyspark.sql.functions import col, explode


In [4]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [5]:
from pyspark.sql import SparkSession
#spark.stop()
spark = SparkSession \
.builder \
.config("spark.executor.instances", "5")\
.config('spark.driver.memory','1g')\
.config('spark.executor.memory', '2g') \
.getOrCreate()

In [6]:
df = spark.read.csv('/content/gdrive/MyDrive/musicrecom/melodice/user_song_with_more_listenings.csv', sep=',', inferSchema=True, header=True, nullValue='NA', nanValue='NA',emptyValue='NA').dropna()


In [7]:
df

DataFrame[link_id: int, cookieid: string, listening_count: int]

In [8]:
from pyspark.sql.functions import split, concat_ws
df = df.withColumn('cookieid', concat_ws('', split(df['cookieid'], '-').getItem(0), split(df['cookieid'], '-').getItem(1)))

In [9]:
df.show()

+-------+--------------+---------------+
|link_id|      cookieid|listening_count|
+-------+--------------+---------------+
|    444|14932231982246|              1|
|    444|14982872233938|              1|
|    445|14656624431950|              1|
|    445|14679500277317|              1|
|    445|14685351613766|              1|
|    445|14700003855348|              2|
|    445|14718532241859|              1|
|    445|14747157932510|              3|
|    445|14754104719423|              1|
|    445|14759470262098|              3|
|    445|14779018817362|              1|
|    445|14793081151516|              1|
|    445|14794998942955|              1|
|    445|14797658782539|              1|
|    445|14809469626288|              1|
|    445|14817385912078|              1|
|    445|14830216339701|              1|
|    445|14833119023401|              1|
|    445|14850214903835|              4|
|    445|14853441377180|              1|
+-------+--------------+---------------+
only showing top

In [10]:
df = df.withColumn('cookieid', col('cookieid').cast('integer')).\
    withColumn('link_id', col('link_id').cast('integer')).\
    withColumn('listening_count', col('listening_count').cast('integer')).dropna()


In [11]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [12]:
(train, test) = df.randomSplit([0.7, 0.3], seed = 1234)
als = ALS(userCol="cookieid", itemCol="link_id", ratingCol="listening_count", nonnegative = True, implicitPrefs = False)

In [13]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [50, 100,]) \
            .addGrid(als.regParam, [.1, .15]) \
            .build()
            # .addGrid(als.maxIter, [5, 50, 100, 200]) \

evaluator = RegressionEvaluator(metricName="rmse", labelCol="listening_count", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  4


In [14]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)


In [15]:
model = cv.fit(train)
best_model = model.bestModel
best_model

ALSModel: uid=ALS_0f7d9b78f614, rank=50

In [16]:
best_model.write().overwrite().save('/content/gdrive/MyDrive/musicrecom/als')

In [17]:
test_predictions = best_model.transform(test)

In [18]:
test_predictions.show()


+-------+----------+---------------+----------+
|link_id|  cookieid|listening_count|prediction|
+-------+----------+---------------+----------+
|    925|1464810685|              1| 2.7671483|
|    905|         0|              1|       NaN|
|   1869|         0|              1|       NaN|
|    927|1464810685|              1|0.69178706|
|    932|         0|              1|       NaN|
|   3250|         0|              1|       NaN|
|   3404|         0|              1|       NaN|
|   3407|         0|              1|       NaN|
|    899|         0|              5|       NaN|
|   2521|         0|              1|       NaN|
|   3357|         0|              1|       NaN|
|   3396|         0|              1|       NaN|
|   2396|         0|              1|       NaN|
|   2525|         0|              1|       NaN|
|   3175|         0|              1|       NaN|
|   3360|         0|              1|       NaN|
|   3402|         0|              1|       NaN|
|   3403|         0|              1|    

In [19]:
nrecommendations = best_model.recommendForAllUsers(10)
nrecommendations.limit(10).show()

+----------+--------------------+
|  cookieid|     recommendations|
+----------+--------------------+
|         0|[{925, 23.664034}...|
|1464810685|[{925, 2.7671483}...|
+----------+--------------------+



In [21]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('cookieid', col("rec_exp.link_id"), col("rec_exp.rating"))

nrecommendations.limit(10).show()

+--------+-------+---------+
|cookieid|link_id|   rating|
+--------+-------+---------+
|       0|    925|23.664034|
|       0|    926| 8.523148|
|       0|   3260|5.9160085|
|       0|    927|5.9160085|
|       0|   3263|4.9300075|
|       0|   3262|4.9300075|
|       0|   3261|4.9300075|
|       0|    928|4.9300075|
|       0|   3265| 3.944006|
|       0|   3264| 3.944006|
+--------+-------+---------+

