### Membuat Spark Context untuk aplikasi Spotiplay

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder \
    .appName("SpotifyStreaming") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.4.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5") \
    .getOrCreate()

In [4]:
print(spark.sparkContext.getConf().get('spark.jars'))

file:///home/bigdata/.ivy2/jars/org.mongodb.spark_mongo-spark-connector_2.11-2.4.1.jar,file:///home/bigdata/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.4.5.jar,file:///home/bigdata/.ivy2/jars/org.mongodb_mongo-java-driver-3.10.2.jar,file:///home/bigdata/.ivy2/jars/org.apache.kafka_kafka-clients-2.0.0.jar,file:///home/bigdata/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/bigdata/.ivy2/jars/org.lz4_lz4-java-1.4.0.jar,file:///home/bigdata/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.7.3.jar,file:///home/bigdata/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar


## Latest Played Streaming

In [5]:
# Define the schema for the streaming DataFrame
schema = StructType([
    StructField("track_id", StringType()),
    StructField("track_name", StringType()),
    StructField("artist_name", StringType()),
    StructField("album_image", StringType()),
    StructField("danceability", DoubleType()),
    StructField("energy", DoubleType()),
    StructField("loudness", DoubleType()),
    StructField("mode", DoubleType()),
    StructField("speechiness", DoubleType()),
    StructField("acousticness", DoubleType()),
    StructField("instrumentalness", DoubleType()),
    StructField("liveness", DoubleType()),
    StructField("valence", DoubleType()),
    StructField("tempo", DoubleType())
])

# Create the streaming DataFrame
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "latest_tracks") \
    .option("startingOffsets", "latest") \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .selectExpr("data.*")
    
#add timestamp
df = df.withColumn("timestamp", current_timestamp())
    

# Print the streaming DataFrame to the console
# query = df.writeStream \
#     .outputMode("append") \
#     .format("console") \
#     .start()
    
df.createOrReplaceTempView("songs")
songs = spark.sql("SELECT * FROM songs")
# query = songs.writeStream \
#     .format("memory") \
#     .queryName("latest_played") \
#     .outputMode("append") \
#     .start()
query = songs.writeStream \
    .format("parquet") \
    .option("path", "/home/bigdata/spotiplay/data") \
    .option("checkpointLocation", "/home/bigdata/spotiplay/checkpoint") \
    .queryName("latest_played") \
    .outputMode("append") \
    .start()

query.awaitTermination()

KeyboardInterrupt: 

### Get From parquet

In [6]:
df = spark.read.parquet("/home/bigdata/spotiplay/data")
df.createOrReplaceTempView("songs")
df = df.sort("timestamp", ascending=False)
df.show()

KeyboardInterrupt: 

In [None]:
mongo_uri = "mongodb://localhost:27017/spotiplay.tracks"
mongo_conf = {"spark.mongodb.input.uri": mongo_uri}
df_tracks = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
             .options(**mongo_conf) \
             .load()
             
df_tracks.createOrReplaceTempView("tracks")
df_tracks.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+--------+--------------+--------------------+--------------------+--------------------+--------+--------------------+----------+--------------------+------------+-----+--------------------+
|                 _id|               album|             artists|      audio_features|   available_markets|disc_number|duration_ms|explicit|  external_ids|       external_urls|                href|                  id|is_local|                name|popularity|         preview_url|track_number| type|                 uri|
+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+--------+--------------+--------------------+--------------------+--------------------+--------+--------------------+----------+--------------------+------------+-----+--------------------+
|[640c321fb08f031d...|[single, single, .

In [None]:
from pyspark.sql.functions import col

df_features = df_tracks.select(col("name"), 
                               col("artists").getItem("name").alias("artists"),
                               col("audio_features").getItem("danceability").alias("danceability"), 
                               col("audio_features").getItem("energy").alias("energy"), 
                               col("audio_features").getItem("loudness").alias("loudness"), 
                               col("audio_features").getItem("speechiness").alias("speechiness"), 
                               col("audio_features").getItem("acousticness").alias("acousticness"), 
                               col("audio_features").getItem("instrumentalness").alias("instrumentalness"), 
                               col("audio_features").getItem("liveness").alias("liveness"), 
                               col("audio_features").getItem("valence").alias("valence"), 
                               col("audio_features").getItem("tempo").alias("tempo"), )

df_features.show()

+--------------------+--------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+
|                name|             artists|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|
+--------------------+--------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+
|                 urs|              [NIKI]|        0.72| 0.415|  -9.527|      0.354|       0.834|         1.01E-6|  0.0893|  0.235|  94.88|
|High School in Ja...|              [NIKI]|       0.873| 0.488|  -8.285|     0.0413|       0.531|         1.32E-4|   0.153|  0.462|119.983|
|   Sick Little Games| [First and Forever]|       0.593| 0.901|  -3.695|     0.0792|     0.00492|         1.58E-5|  0.0805|  0.532| 96.564|
|          Drive Safe|        [Rich Brian]|       0.635| 0.312|  -8.134|     0.0265|       0.635|             0.0|   0.138|  0.414| 98.017|
|La La Lost You - ..

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler=VectorAssembler(inputCols=[
 'danceability',
 'energy',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo'], outputCol='features')
assembled_data=assembler.setHandleInvalid("skip").transform(df_features)

In [None]:
from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
df_scaled=data_scale.transform(assembled_data)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')


KMeans_algo=KMeans(featuresCol='standardized', k=7)
    
KMeans_fit=KMeans_algo.fit(df_scaled)
    
output_df =KMeans_fit.transform(df_scaled)

pipeline = Pipeline(stages=[assembler, scale, KMeans_algo])

In [None]:
output_df.show()

+--------------------+--------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+----------+
|                name|             artists|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|            features|        standardized|prediction|
+--------------------+--------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+----------+
|                 urs|              [NIKI]|        0.72| 0.415|  -9.527|      0.354|       0.834|         1.01E-6|  0.0893|  0.235|  94.88|[0.72,0.415,-9.52...|[4.81454042756875...|         4|
|High School in Ja...|              [NIKI]|       0.873| 0.488|  -8.285|     0.0413|       0.531|         1.32E-4|   0.153|  0.462|119.983|[0.873,0.488,-8.2...|[5.83763026842711...|         2|
|   Sick Little Games| [First and F

In [None]:
df.show()

+--------------------+--------------------+-------+--------------------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-------+--------------------+
|            track_id|                name|artists|         album_image|danceability|energy|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|           timestamp|
+--------------------+--------------------+-------+--------------------+------------+------+--------+----+-----------+------------+----------------+--------+-------+-------+--------------------+
|7c98gah3Qah9o76kg...|        Bahasa Kalbu|   null|https://i.scdn.co...|        0.22| 0.284|  -9.985| 1.0|     0.0297|       0.858|         1.79E-5|   0.291|  0.155| 83.837|2023-03-13 13:57:...|
|5WqCYYanb5HoIqJei...|Kyokuto no hitsuj...|   null|https://i.scdn.co...|       0.633| 0.859|  -7.596| 1.0|      0.181|     1.14E-4|         0.00375|   0.102|  0.632|130.012|2023-03-13 13:57:...|
|3zIm0KzxRoGWN6hse...|Lo-

In [None]:
#rename df.track_name to df.name
df = df.withColumnRenamed("track_name", "name")
df = df.withColumnRenamed("artist_name", "artists")

cols = ['name', 'artists', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms']

cols_drop = ['track_id','album_image','timestamp','mode']

df_new = df.drop(*cols_drop)

df_new.show()

+--------------------+-------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+
|                name|artists|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|
+--------------------+-------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+
|Lo-fi shojo wa ky...|   null|       0.628| 0.436| -12.218|     0.0335|        0.97|           0.828|   0.258|   0.29| 79.986|
|Kyokuto no hitsuj...|   null|       0.633| 0.859|  -7.596|      0.181|     1.14E-4|         0.00375|   0.102|  0.632|130.012|
|        Bahasa Kalbu|   null|        0.22| 0.284|  -9.985|     0.0297|       0.858|         1.79E-5|   0.291|  0.155| 83.837|
|        Usai Di Sini|   null|       0.445| 0.201| -11.244|     0.0391|       0.965|             0.0|  0.0957|  0.547|121.468|
|  Terjebak Nostalgia|   null|       0.251| 0.268|  -8.433|     0.0358|       0.915|             0.0|   0.151| 

In [None]:
df_latest_song = pipeline.fit(df_new).transform(df_new)

df_predict = df_latest_song.select("name", "artists", "prediction")

df_predict.show()

+--------------------+-------+----------+
|                name|artists|prediction|
+--------------------+-------+----------+
|        Bahasa Kalbu|   null|         0|
|Kyokuto no hitsuj...|   null|         6|
|Lo-fi shojo wa ky...|   null|         3|
|  Terjebak Nostalgia|   null|         0|
|           Melangkah|   null|         5|
|Apalah (Arti Menu...|   null|         2|
|          Jatuh Hati|   null|         2|
|          Kali Kedua|   null|         2|
|        Usai Di Sini|   null|         2|
|            Berdamai|   null|         2|
|             Kembali|   null|         0|
|Jangan Cepat Berlalu|   null|         2|
|     Mantan Terindah|   null|         2|
|                 LDR|   null|         2|
|       Love & Let Go|   null|         2|
|             3:03 PM|   null|         1|
|             2:23 AM|   null|         5|
|   Nyawa dan Harapan|   null|         2|
|          Biarkanlah|   null|         2|
|     Anganku Anganmu|   null|         4|
+--------------------+-------+----

In [None]:
#show the songs in the same cluster
output_df.filter(output_df.prediction == 2).limit(5).show()

+--------------------+-------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+----------+
|                name|            artists|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|            features|        standardized|prediction|
+--------------------+-------------------+------------+------+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+----------+
|High School in Ja...|             [NIKI]|       0.873| 0.488|  -8.285|     0.0413|       0.531|         1.32E-4|   0.153|  0.462|119.983|[0.873,0.488,-8.2...|[5.83763026842711...|         2|
|   Sick Little Games|[First and Forever]|       0.593| 0.901|  -3.695|     0.0792|     0.00492|         1.58E-5|  0.0805|  0.532| 96.564|[0.593,0.901,-3.6...|[3.96530899103926...|         2|
|Introvert (feat. ...| [Rich Brian, Joji

In [None]:
from pyspark.sql.functions import to_json, struct

# Convert data frame to JSON string
df_predict_json = df_predict.select(to_json(struct("*")).alias("value"))


# Write data frame to Kafka topic
df_predict_json.write.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "recommendations") \
    .save()

In [None]:
from pyspark.mllib.linalg import Vectors
#import to_json
from pyspark.sql.functions import to_json, struct
def euclidean_distance(x, y):
    distance = np.linalg.norm(np.array(x) - np.array(y))
    distance_in_int = int(distance * 100000)
    return distance_in_int

def spotify_recommendations(row,num_recommendations=5):
    # create a list to store the recommendations
    recommendations = []
    latest_song_name = row['name']
    latest_song_cluster = row['prediction']
    latest_song_features = row['standardized']

    # filter the music_database RDD to only include songs in the same cluster as the latest_song
    music_database_filtered = music_database.filter(lambda x: x[2] == latest_song_cluster)

    # calculate the Euclidean distance between latest_song and each song in music_database_filtered
    music_database_distances = music_database_filtered.map(lambda x: (x[0], euclidean_distance(x[1], latest_song_features)))

    # sort music_database_distances by distance in ascending order
    music_database_sorted = music_database_distances.sortBy(lambda x: x[1], ascending=True)
    
    #remove duplicate songs from the list
    music_database_sorted = music_database_sorted.reduceByKey(lambda x, y: x)

    # get the top 5 songs
    recommendations_temp = music_database_sorted.take(num_recommendations)

    # add the recommendations to the recommendations list
    recommendations.extend([(recommendation[0], recommendation[1]) for recommendation in recommendations_temp])

    return recommendations

music_database = output_df.rdd.map(lambda x: (x['name'], x['standardized'], x['prediction']))
# foreachBatch() will apply the function to each batch of data
random_df = df_latest_song.toPandas()

sample_df = random_df.sample(n=1)

for index, row in sample_df.iterrows():
    random_row = row
    print(random_row)
    recommendations = spotify_recommendations(random_row,5)
    print(recommendations)
    recommendations_df = spark.createDataFrame(recommendations, ['name', 'distance'])
    
    kafka_df = recommendations_df.select(to_json(struct("*")).alias("value"))
    kafka_df.write.format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", "recommendations") \
        .save()

name                                                         Berdamai
artists                                                          None
danceability                                                    0.476
energy                                                          0.281
loudness                                                       -7.855
speechiness                                                    0.0288
acousticness                                                    0.913
instrumentalness                                             4.09e-05
liveness                                                        0.158
valence                                                         0.377
tempo                                                         137.114
features            [0.476, 0.281, -7.855, 0.0288, 0.913, 4.09e-05...
standardized        [3.3552678421555773, 1.6047389718535139, -4.12...
prediction                                                          2
Name: 9, dtype: obje