# Asssignment 3

In [1]:
import spotipy
import json

from dotenv import load_dotenv
from spotipy.oauth2 import SpotifyOAuth
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, lit, rand, udf, to_json, struct
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [2]:
packages = {
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,com.redislabs:spark-redis_2.12:3.1.0"
}

In [3]:
load_dotenv()
spark = (
    SparkSession.builder.appName("assignment3")
    .config("spark.jars.packages", ",".join(packages))
    .config("spark.redis.port", "6379")
    .config("spark.redis.host", "127.0.0.1")
    .getOrCreate()
)
producer = KafkaProducer(bootstrap_servers="localhost:9092")
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/home/hajta2/.local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/hajta2/.ivy2/cache
The jars for the packages stored in: /home/hajta2/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
com.redislabs#spark-redis_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7f2027f4-4738-41bd-b653-7800fdd768d3;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found com

In [4]:
scope = "user-top-read playlist-modify-public playlist-modify-private"
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))

In [5]:
producer.send("liked_tracks_topic", b"")
producer.send("not_liked_tracks_topic", b"")
producer.send("trending_tracks_topic", b"")
producer.send("audio_features_topic", b"")
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "liked_tracks_topic, audio_features_topic, trending_tracks_topic, not_liked_tracks_topic")
    .option("startingOffsets", "latest")
    .load()
)

In [6]:
liked_tracks_stream = df.selectExpr("CAST(value AS STRING)").filter("topic = 'liked_tracks_topic'")
not_liked_tracks_stream = df.selectExpr("CAST(value AS STRING)").filter("topic = 'not_liked_tracks_topic'")
features_stream = df.selectExpr("CAST(value AS STRING)").filter(
    "topic = 'audio_features_topic'"
)
trending_tracks_stream = df.selectExpr("CAST(value AS STRING)").filter(
    "topic = 'trending_tracks_topic'"
)

In [7]:
liked_tracks_query = liked_tracks_stream.writeStream.format("memory").queryName("liked_tracks").start()
not_liked_tracks_query = not_liked_tracks_stream.writeStream.format("memory").queryName("not_liked_tracks").start()
features_query = (
    features_stream.writeStream.format("memory").queryName("features").start()
)
trending_tracks_query = (
    trending_tracks_stream.writeStream.format("memory")
    .queryName("trending_tracks")
    .start()
)

In [8]:
saved_tracks = sp.current_user_top_tracks(limit=50, offset=0, time_range="short_term")
for track in saved_tracks["items"]:
    producer.send("liked_tracks_topic", json.dumps(track).encode("utf-8"))
    audio_features = sp.audio_features(track["id"])
    producer.send("audio_features_topic", json.dumps(audio_features[0]).encode("utf-8"))

                                                                                

In [9]:
playlist_id = "37i9dQZF1DX8SfyqmSFDwe"
not_liked_tracks = sp.playlist_tracks(playlist_id, limit=100, offset=0)
for item in not_liked_tracks["items"]:
    track = item["track"]
    producer.send("not_liked_tracks_topic", json.dumps(track).encode("utf-8"))
    audio_features = sp.audio_features(track["id"])
    producer.send("audio_features_topic", json.dumps(audio_features[0]).encode("utf-8"))

In [10]:
trending_playlist_id = sp.featured_playlists(limit=1, country="HU", locale="hu_HU")[
    "playlists"
]["items"][0]["id"]
trending_tracks = sp.playlist_tracks(
    trending_playlist_id, limit=50, offset=0, market="HU"
)["items"]
for item in trending_tracks:
    track = item["track"]
    producer.send("trending_tracks_topic", json.dumps(track).encode("utf-8"))
    audio_features = sp.audio_features(track["id"])
    producer.send("audio_features_topic", json.dumps(audio_features[0]).encode("utf-8"))

In [11]:
liked_tracks = spark.sql("select * from liked_tracks")
not_liked_tracks = spark.sql("select * from not_liked_tracks")
features = spark.sql("select * from features")
trending_tracks = spark.sql("select * from trending_tracks")

In [12]:
print(f"Tracks: {liked_tracks.count()}")
print(f"Not liked tracks: {not_liked_tracks.count()}")
print(f"Trending tracks: {trending_tracks.count()}")
print(f"Features: {features.count()}")

Tracks: 50
Not liked tracks: 100
Trending tracks: 50
Features: 200


In [13]:
liked_tracks_query.stop()
not_liked_tracks_query.stop()
features_query.stop()
trending_tracks_query.stop()

In [14]:
liked_tracks.printSchema()
not_liked_tracks_stream.printSchema()
features.printSchema()
trending_tracks.printSchema()

root
 |-- value: string (nullable = true)

root
 |-- value: string (nullable = true)

root
 |-- value: string (nullable = true)

root
 |-- value: string (nullable = true)



In [15]:
tracks_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("artists", StringType(), True),
        StructField("duration_ms", IntegerType(), True),
        StructField("popularity", FloatType(), True),
    ]
)

liked_tracks_parsed = liked_tracks.withColumn(
    "parsed_value", from_json(col("value"), tracks_schema)
).select("parsed_value.*")
not_liked_tracks_parsed = not_liked_tracks.withColumn(
    "parsed_value", from_json(col("value"), tracks_schema)
).select("parsed_value.*")
trending_tracks_parsed = trending_tracks.withColumn(
    "parsed_value", from_json(col("value"), tracks_schema)
).select("parsed_value.*")

In [16]:
liked_tracks_parsed.printSchema()
print(f"Tracks: {liked_tracks_parsed.show(1)}")

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- popularity: float (nullable = true)

+--------------------+--------------------+--------------------+-----------+----------+
|                  id|                name|             artists|duration_ms|popularity|
+--------------------+--------------------+--------------------+-----------+----------+
|4Z4i631BesV0P6LTv...|Talk to Me You'll...|[{"external_urls"...|     417099|      55.0|
+--------------------+--------------------+--------------------+-----------+----------+
only showing top 1 row

Tracks: None


In [17]:
features_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("danceability", FloatType(), True),
        StructField("energy", FloatType(), True),
        StructField("key", IntegerType(), True),
        StructField("loudness", FloatType(), True),
        StructField("mode", IntegerType(), True),
        StructField("speechiness", FloatType(), True),
        StructField("acousticness", FloatType(), True),
        StructField("instrumentalness", FloatType(), True),
        StructField("liveness", FloatType(), True),
        StructField("valence", FloatType(), True),
        StructField("tempo", FloatType(), True),
    ]
)

features_parsed = features.withColumn(
    "parsed_value", from_json(col("value"), features_schema)
).select("parsed_value.*")

In [18]:
features_parsed.printSchema()
print(f"Features: {features_parsed.count()}")
print(f"Features: {features_parsed.show(1)}")

root
 |-- id: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- valence: float (nullable = true)
 |-- tempo: float (nullable = true)

Features: 200
+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+
|                  id|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|
+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+
|4Z4i631BesV0P6LTv...|       0.741| 0.619|  1| -11.366|   0|     0.0514|       0.661|           0.674|   0.125|  0.146|126.00

In [19]:
joined_liked_tracks = liked_tracks_parsed.join(
    features_parsed, liked_tracks_parsed.id == features_parsed.id
).drop(features_parsed.id)
joined_not_liked_tracks = not_liked_tracks_parsed.join(
    features_parsed, not_liked_tracks_parsed.id == features_parsed.id
).drop(features_parsed.id)
joined_trending_tracks = trending_tracks_parsed.join(
    features_parsed, trending_tracks_parsed.id == features_parsed.id
).drop(features_parsed.id)

In [20]:
joined_liked_tracks = joined_liked_tracks.withColumn("favorite", lit(1))
joined_not_liked_tracks = joined_trending_tracks.withColumn("favorite", lit(0))

In [21]:
combined_tracks = joined_liked_tracks.union(joined_not_liked_tracks)

In [22]:
combined_tracks = combined_tracks.orderBy(rand())
X_train, X_test = combined_tracks.randomSplit([0.8, 0.2])

In [23]:
assembler = VectorAssembler(
    inputCols=[
        "duration_ms",
        "popularity",
        "danceability",
        "energy",
        "key",
        "loudness",
        "mode",
        "speechiness",
        "acousticness",
        "instrumentalness",
        "liveness",
        "valence",
        "tempo",
    ],
    outputCol="features",
)
assembled_train = assembler.transform(X_train)
assembled_test = assembler.transform(X_test)
assembled_trending = assembler.transform(joined_trending_tracks)

In [24]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(assembled_train)
scaled_train = scalerModel.transform(assembled_train)

In [25]:
random_forest = RandomForestClassifier(
    featuresCol="features", labelCol="favorite", numTrees=100, maxDepth=5, maxBins=32
)
model = random_forest.fit(scaled_train)

In [26]:
predictions = model.transform(assembled_test)
print(f"Precision: {predictions.filter(predictions.favorite == predictions.prediction).count() / predictions.count()}")

Precision: 0.8260869565217391


In [27]:
def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None
    
ith = udf(ith_, FloatType())

In [28]:
recommendations = model.transform(assembled_trending).filter(ith(col("probability"), lit(1)) > 0.3).orderBy(col("probability").desc()).limit(10)

In [29]:
recommendations.show()



+--------------------+--------------------+--------------------+-----------+----------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+--------------------+----------+
|                  id|                name|             artists|duration_ms|popularity|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+-----------+----------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+--------------------+----------+
|3C1ejh3RlOsfBOkwA...|        Nehéz vagyok|[{"external_urls"...|     180750|      36.0|       0.482| 0.706|  0|  -5.918|   1|     0.0952|       0.144|             0.0|  0.0851|  0.482|203.917|[180750

                                                                                

In [30]:
recommendations_json = recommendations.select('id', 'name', 'artists', 'probability').toJSON().collect()

                                                                                

In [31]:
output_json = {}
for track in recommendations_json:
    track = json.loads(track)
    artists = [artist["name"] for artist in json.loads(track["artists"])]
    output_json[track["id"]] = {
        "name": track["name"],
        "artists": artists,
        "probability": track["probability"],
    }

In [32]:
df = spark.createDataFrame(list(output_json.items()), ["id", "data"])

In [36]:
df.write.format('org.apache.spark.sql.redis').option('table', 'recommendations').option('key.column', 'id').save()

                                                                                