In [None]:
# Install spark
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 31 kB/s 
[?25hCollecting py4j
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 39.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=a9b49a1051a1689624ccc26e8e9309852a98184ee4137ace45bf9fa1fdebe9f9
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator

In [None]:
spark = SparkSession.builder.appName("song_recommender").getOrCreate()

In [None]:
listening_csv_path = "/content/drive/MyDrive/DS-Master studije/Rukovanje velikim podacima (Big Data)/UserTasteProfilePySpark.csv"
df = spark.read.csv(listening_csv_path, inferSchema=True,header=True)

In [None]:
df.show()

+------+------+-----+
|userId|songId|plays|
+------+------+-----+
|     1|     1|  1.0|
|     1|     2|  2.0|
|     1|     3|  1.0|
|     1|     4|  1.0|
|     1|     5|  1.0|
|     1|     6|  5.0|
|     1|     7|  1.0|
|     1|     8|  1.0|
|     1|     9|  1.0|
|     1|    10|  1.0|
|     1|    11|  1.0|
|     1|    12|  5.0|
|     1|    13|  1.0|
|     1|    14|  1.0|
|     1|    15|  5.0|
|     1|    16|  1.0|
|     1|    17|  6.0|
|     1|    18|  1.0|
|     1|    19|  1.0|
|     1|    20|  1.0|
+------+------+-----+
only showing top 20 rows



In [None]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|            userId|            songId|             plays|
+-------+------------------+------------------+------------------+
|  count|            124566|            124566|            124566|
|   mean| 2390.164515196763|3429.9480596631506| 3.039970778543102|
| stddev|1395.8809554795125|2738.4497552680705|6.8210680790211065|
|    min|                 1|                 1|               1.0|
|    max|              4752|              9976|             796.0|
+-------+------------------+------------------+------------------+



In [None]:
# Create test and train set
(training, test) = df.randomSplit([0.8,0.2])

In [None]:
training.count()

99522

In [None]:
test.count()

25044

In [None]:
# Create ALS model
als = ALS(userCol="userId", itemCol="songId", ratingCol="plays", coldStartStrategy="drop", nonnegative=True)

In [None]:
# Tune model using ParamGridBuilder
param_grid = ParamGridBuilder()\
  .addGrid(als.rank, [9,15])\
  .addGrid(als.maxIter, [3,4])\
  .addGrid(als.regParam, [.01, .09])\
  .build()

In [None]:
# Define evaluator as RSME
evaluator = RegressionEvaluator(metricName="rmse", labelCol="plays", predictionCol="prediction")

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds = 5)

In [None]:
# Fit ALS model to training data
model = cv.fit(training)

In [None]:
# Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

In [None]:
# Generate predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
# Print evaluation metrics and model parameters
print("RMSE = " + str(rmse))
print("**Best Model**")
print("\tRank:", best_model.rank)
print("\tMaxIter:", best_model._java_obj.parent().getMaxIter())
print("\tRegParam:", best_model._java_obj.parent().getRegParam())

RMSE = 8.824319434569002
**Best Model**
	Rank: 15
	MaxIter: 4
	RegParam: 0.09


In [None]:
predictions.sort("userId","plays").show()

+------+------+-----+----------+
|userId|songId|plays|prediction|
+------+------+-----+----------+
|     1|    39|  1.0|0.78157336|
|     1|     3|  1.0|0.78499407|
|     1|     7|  1.0| 14.229012|
|     1|    34|  1.0|   1.99155|
|     1|    45|  1.0|  6.071881|
|     1|    24|  1.0| 2.1305244|
|     1|    19|  1.0| 0.6143705|
|     1|    32|  1.0| 3.8728986|
|     1|    36|  1.0|0.48036748|
|     1|    23|  1.0| 1.9935567|
|     1|     2|  2.0|  5.844646|
|     1|    15|  5.0|  2.061989|
|     3|    49|  1.0| 0.6763756|
|     5|    62|  1.0|0.75203264|
|     5|    69|  1.0| 2.9096985|
|     5|    71|  1.0| 5.9639063|
|     5|    70|  9.0|  2.833328|
|     6|    79|  4.0| 2.9303856|
|     6|    78|  6.0| 3.3957558|
|     6|    86|  8.0| 7.8709383|
+------+------+-----+----------+
only showing top 20 rows



In [None]:
# Generate 10 Recommendations for each user
user_recommendations = best_model.recommendForAllUsers(10)



In [None]:
# Show 20 users with the list of their recommendations
user_recommendations.select("userId","recommendations.songId").show(20, False)

+------+------------------------------------------------------------+
|userId|songId                                                      |
+------+------------------------------------------------------------+
|1     |[2669, 4680, 9001, 2680, 8844, 4139, 1620, 5402, 2989, 88]  |
|3     |[8844, 2669, 7791, 8617, 4680, 2509, 2570, 9001, 8606, 2413]|
|5     |[2669, 4680, 2680, 9001, 1620, 8844, 7217, 2989, 5402, 6966]|
|6     |[2680, 8617, 7791, 4638, 2669, 8844, 5109, 2509, 4026, 9488]|
|9     |[8844, 2509, 8617, 8606, 4638, 5369, 8145, 9686, 5155, 7791]|
|12    |[8844, 2570, 4139, 6043, 9001, 6047, 7791, 8617, 2680, 9488]|
|13    |[1355, 2413, 6990, 5745, 2669, 4680, 8655, 3999, 7054, 1562]|
|15    |[5624, 7950, 8145, 2669, 479, 7791, 6047, 2509, 2710, 6961] |
|16    |[4680, 2669, 2570, 8844, 2680, 6043, 9001, 7217, 6966, 1620]|
|17    |[2669, 7791, 2570, 4680, 6043, 2680, 8606, 8381, 8844, 1710]|
|19    |[8844, 8617, 2669, 4680, 2680, 3866, 6047, 2509, 2570, 2989]|
|20    |[8844, 7791,

In [None]:
songs_csv_path = "/song_data.csv"
songs_df = spark.read.csv(songs_csv_path, inferSchema=True,header=True)

In [None]:
songs_df.show()

+------------------+--------------------+--------------------+--------------------+----+----+----+----+
|           song_id|               title|          album_name|         artist_name|year| _c5| _c6| _c7|
+------------------+--------------------+--------------------+--------------------+----+----+----+----+
|SOQMMHC12AB0180CB8|        Silent Night|Monster Ballads X...|    Faster Pussy cat|2003|null|null|null|
|SOVFVAK12A8C1350D9|         Tanssi vaan|         Karkuteillä|    Karkkiautomaatti|1995|null|null|null|
|SOGTUKN12AB017F4F1|   No One Could Ever|              Butter|      Hudson Mohawke|2006|null|null|null|
|SOBNYVR12A8C13558C|       Si Vos Querés|             De Culo|         Yerba Brava|2003|null|null|null|
|SOHSBXH12A8C13B0DF|    Tangle Of Aspens|Rene Ablaze Prese...|          Der Mystic|   0|null|null|null|
|SOZVAPQ12A8C13B63C|"Symphony No. 1 G...|Berwald: Symphoni...|    David Montgomery|   0|null|null|null|
|SOQVRHI12A6D4FB2D7|    We Have Got Love|Strictly The Best...|  

In [None]:
songs_df.count()

1000000

In [None]:
songs_collection = songs_df.collect()

In [None]:
for song_id in [10,150,440,321,4050,3444,1233,9382]:
  song = songs_collection[song_id]
  print("{0} - {1}".format(song.artist_name, song.title))

3 Gars Su'l Sofa - L'antarctique
Debbie Davies - You Don't Know What You're Doing
Good Riddance - Yesterday's Headlines
Sir Neville Marriner/Academy of St Martin-in-the-Fields - Serenade No. 13 in G_ 'Eine kleine Nachtmusik' K525 (1997 Digital Remaster): III.  Menuetto (Allegretto)
Head Hits Concrete - No Longer Among Us
Bluvertigo - Versozero
Céline Dion - A cause
Natalie Cole - Your Lonely Heart (Digitally Remastered 02)


In [None]:
# Show list of 20 song recommendations for specific user 
recommendation_for_specific_user = user_recommendations.select("userId","recommendations.songId").where("userId = 3500")
recommendation_for_specific_user.show(20, False)

+------+-----------------------------------------------------------+
|userId|songId                                                     |
+------+-----------------------------------------------------------+
|3500  |[6047, 8844, 3166, 7061, 8145, 2509, 5155, 307, 4638, 8606]|
+------+-----------------------------------------------------------+



In [None]:
songs_for_specific_user = recommendation_for_specific_user.collect()[0][1]
songs_for_specific_user

[6047, 8844, 3166, 7061, 8145, 2509, 5155, 307, 4638, 8606]

In [None]:
for song_id in songs_for_specific_user:
  song = songs_collection[song_id]
  print("{0} - {1}".format(song.artist_name, song.title))

Joe Stump - Night Of The Living Shred
Bow Wave - MESTO V MLZE
Noitalinna Huraa! - Kurja kissa
Soilwork - Breeding Thorns
Mikey Dread - Stem Cells
Johnny Foreigner - Criminals
Tim Carroll - Keep Me Down
Virus - Carheart
Hangedup - Eksplozije
Mike Farris - I'm Gonna Get There
