# Setup

In [2]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, StringType, ArrayType, DoubleType, FloatType
from pyspark.sql.functions import from_json, col, udf, row_number, lit
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

from track_separator.download_mp3 import download_mp3
from track_separator.spleeter import separate_track
from scraper.script import scrape_chords
from reccomendation_system.reccomendation import *

In [None]:
spark = SparkSession.builder \
	.appName("kCHORDS") \
	.getOrCreate()

spark.sparkContext.setLogLevel("INFO")

In [4]:
df_song_requests_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafkaServer:9092") \
    .option("subscribe", "songRequests") \
    .load()

In [5]:

SONG_REQUEST_SCHEMA = StructType([
    StructField("Yt_Id", StringType(), True),
    StructField("Yt_Link", StringType(), True),
    StructField("UgChords_Link", StringType(), True),
    StructField("Request_Date", TimestampType(), True),
])

In [6]:
def print_df(df):
	query = df.writeStream \
	.outputMode("append") \
	.format("console") \
	.start()

	query.awaitTermination()

# Lettura da topic Input

In [7]:
df_song_requests = df_song_requests_raw \
	.selectExpr("CAST(value AS STRING)") \
	.select(from_json(col("value"), SONG_REQUEST_SCHEMA).alias("data")) \
	.select("data.*")

# Scraping accordi

In [8]:
@udf(returnType=StringType())
def scrape_chords_udf(chords_link):
	return scrape_chords(chords_link)

df_chords_raw = df_song_requests.withColumn("scraped_data_raw", scrape_chords_udf(col("UgChords_Link")))

In [9]:
ArtistType = StructType([
	StructField('name', StringType(), True),
	StructField('profile_link', StringType(), True),
])

AuthorType = StructType([
	StructField('profile_link', StringType(), True),
	StructField('username', StringType(), True),
])

CommentsType = 	ArrayType(StructType([
        StructField('author', StringType(), True),
		StructField('message', StringType(), True),
		StructField('date', TimestampType(), True),
		StructField('upvote', IntegerType(), True)
]))

tab_schema = ArrayType(StructType([
	StructField('link', StringType(), True),
	StructField('name', StringType(), True),
	StructField('stars', IntegerType(), True),
]))
MoreVersionType = tab_schema
RelatedTabsType = tab_schema

CHORDS_SCHEMA = StructType(
    [
        StructField('added_favorites', IntegerType(), True),
        StructField('artist', ArtistType, True),
        StructField('author', AuthorType, True),
        StructField('capo_position', StringType(), True),
        StructField('chords', StringType(), True),
        StructField('comments', CommentsType, True),
        StructField('difficulty', StringType(), True),
        StructField('key', StringType(), True),
        StructField('more_versions', MoreVersionType, True),
        StructField('name', StringType(), True),
        StructField('related_tabs', RelatedTabsType, True),
        StructField('stars', IntegerType(), True),
        StructField('tuning', StringType(), True),
        StructField('url', StringType(), True),
        StructField('views', IntegerType(), True),
    ]
)

df_chords = df_chords_raw \
    .select("*", from_json(col("scraped_data_raw"), CHORDS_SCHEMA).alias("scraped_data"))

df_chords.printSchema()

root
 |-- Yt_Id: string (nullable = true)
 |-- Yt_Link: string (nullable = true)
 |-- UgChords_Link: string (nullable = true)
 |-- Request_Date: timestamp (nullable = true)
 |-- scraped_data_raw: string (nullable = true)
 |-- scraped_data: struct (nullable = true)
 |    |-- added_favorites: integer (nullable = true)
 |    |-- artist: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profile_link: string (nullable = true)
 |    |-- author: struct (nullable = true)
 |    |    |-- profile_link: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |-- capo_position: string (nullable = true)
 |    |-- chords: string (nullable = true)
 |    |-- comments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- message: string (nullable = true)
 |    |    |    |-- date: timestamp (nullable = true)
 |    |    |    |-- upvote: integer (null

# Estrazione audio tracks

In [10]:
@udf(returnType=StringType())
def download_mp3_udf(yt_link, request_id):
	return download_mp3(yt_link, request_id)


df_audio = df_song_requests.withColumn("Hdfs_Song_Path", download_mp3_udf(col("Yt_Link"), col("Yt_Id")))
df_audio.printSchema()

root
 |-- Yt_Id: string (nullable = true)
 |-- Yt_Link: string (nullable = true)
 |-- UgChords_Link: string (nullable = true)
 |-- Request_Date: timestamp (nullable = true)
 |-- Hdfs_Song_Path: string (nullable = true)



In [11]:
@udf(returnType=StringType())
def separate_track_udf(hdfs_song_path, request_id):
	return separate_track(hdfs_song_path, request_id)


df_separated_tracks = df_audio.withColumn("Hdfs_Tracks_Path", separate_track_udf(col("Hdfs_Song_Path"), col("Yt_Id")))

df_separated_tracks \
	.printSchema()

root
 |-- Yt_Id: string (nullable = true)
 |-- Yt_Link: string (nullable = true)
 |-- UgChords_Link: string (nullable = true)
 |-- Request_Date: timestamp (nullable = true)
 |-- Hdfs_Song_Path: string (nullable = true)
 |-- Hdfs_Tracks_Path: string (nullable = true)



# Sistema di Raccomandazione

In [12]:
RECCOMENDATION_DATASET_SCHEMA = StructType([
    StructField("index", IntegerType(), True),
    StructField("track_id", StringType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("danceability", DoubleType(), True),
    StructField("energy", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("speechiness", DoubleType(), True),
    StructField("acousticness", DoubleType(), True),
    StructField("instrumentalness", DoubleType(), True),
    StructField("liveness", DoubleType(), True),
    StructField("valence", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
] + [
    StructField(f"genre_{genre}", DoubleType(), True)
    for genre in [
		'acoustic', 'alt-rock', 'alternative', 'ambient', 'blues', 'classical', 'country', 'dance', 
		'disco', 'electro', 'electronic', 'folk', 'funk', 'gospel', 'hip-hop', 'house', 'indie', 'jazz', 
		'latin', 'metal', 'pop', 'rock', 'soul', 'synth-pop', 'techno', 'trance'
    ]
])

song_dataset = spark \
    .read \
    .option("mode", "PERMISSIVE") \
    .schema(RECCOMENDATION_DATASET_SCHEMA) \
    .option("delimiter", "\t") \
    .option("header", "true") \
    .csv("reccomendation_system/SONG_DATASET.csv")

song_dataset.show()

+-----+--------------------+------------------+------------+------+------------------+-----------+------------+----------------+--------+-------+-------------------+--------------+--------------+-----------------+-------------+-----------+---------------+-------------+-----------+-----------+-------------+----------------+----------+----------+------------+-------------+-----------+-----------+----------+-----------+-----------+---------+----------+----------+---------------+------------+------------+
|index|            track_id|        popularity|danceability|energy|          loudness|speechiness|acousticness|instrumentalness|liveness|valence|              tempo|genre_acoustic|genre_alt-rock|genre_alternative|genre_ambient|genre_blues|genre_classical|genre_country|genre_dance|genre_disco|genre_electro|genre_electronic|genre_folk|genre_funk|genre_gospel|genre_hip-hop|genre_house|genre_indie|genre_jazz|genre_latin|genre_metal|genre_pop|genre_rock|genre_soul|genre_synth-pop|genre_techno|

In [13]:
columns_to_drop = ['index', 'track_id']
song_dataset_cosine_sim = song_dataset.drop(*columns_to_drop)

song_dataset_cosine_sim.show()

+------------------+------------+------+------------------+-----------+------------+----------------+--------+-------+-------------------+--------------+--------------+-----------------+-------------+-----------+---------------+-------------+-----------+-----------+-------------+----------------+----------+----------+------------+-------------+-----------+-----------+----------+-----------+-----------+---------+----------+----------+---------------+------------+------------+
|        popularity|danceability|energy|          loudness|speechiness|acousticness|instrumentalness|liveness|valence|              tempo|genre_acoustic|genre_alt-rock|genre_alternative|genre_ambient|genre_blues|genre_classical|genre_country|genre_dance|genre_disco|genre_electro|genre_electronic|genre_folk|genre_funk|genre_gospel|genre_hip-hop|genre_house|genre_indie|genre_jazz|genre_latin|genre_metal|genre_pop|genre_rock|genre_soul|genre_synth-pop|genre_techno|genre_trance|
+------------------+------------+------+

In [14]:
dataset_columns = song_dataset_cosine_sim.columns
print(dataset_columns)

['popularity', 'danceability', 'energy', 'loudness', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'genre_acoustic', 'genre_alt-rock', 'genre_alternative', 'genre_ambient', 'genre_blues', 'genre_classical', 'genre_country', 'genre_dance', 'genre_disco', 'genre_electro', 'genre_electronic', 'genre_folk', 'genre_funk', 'genre_gospel', 'genre_hip-hop', 'genre_house', 'genre_indie', 'genre_jazz', 'genre_latin', 'genre_metal', 'genre_pop', 'genre_rock', 'genre_soul', 'genre_synth-pop', 'genre_techno', 'genre_trance']


In [15]:
genres = [genre.replace('genre_','') for genre in song_dataset.columns if "genre_" in genre]
genres

['acoustic',
 'alt-rock',
 'alternative',
 'ambient',
 'blues',
 'classical',
 'country',
 'dance',
 'disco',
 'electro',
 'electronic',
 'folk',
 'funk',
 'gospel',
 'hip-hop',
 'house',
 'indie',
 'jazz',
 'latin',
 'metal',
 'pop',
 'rock',
 'soul',
 'synth-pop',
 'techno',
 'trance']

In [16]:
columns = ['Yt_Id', 'Hdfs_Song_Path', col('scraped_data.artist.name').alias('artist_name')]
df_reccomendation = df_audio \
    .join(df_chords, on=["Yt_Id"], how="inner") \
    .select(*columns)

df_reccomendation.printSchema()

root
 |-- Yt_Id: string (nullable = true)
 |-- Hdfs_Song_Path: string (nullable = true)
 |-- artist_name: string (nullable = true)



In [17]:
def reccomend_songs(batch_df, batch_id):
	for row in batch_df.collect():
		video_id = row['Yt_Id']
		request_id = row['Yt_Id']
		hdfs_mp3_file_path = row['Hdfs_Song_Path']
		artist_name = row['artist_name']

		# getting the df of the song in the same form as the dataset
		# {popularity, danceability, energy, loudness, speechiness, acousticness, instrumentalness, liveness, valence, tempo, genre_...}
		song_df = get_song_df(video_id, request_id, hdfs_mp3_file_path, artist_name, genres)

		# similarty(dataset, df_song)
		x = song_dataset_cosine_sim.toPandas()
		y = song_df
		similarity_scores = cosine_similarity(x, y)
		
		feat_vec = song_dataset.toPandas()
		feat_vec['similarity_score'] = similarity_scores
 
		# extract the top 5 most similar songs
		artist_genre = extract_genre_from_df(song_df)
		print("Genre: ", artist_genre)
		top_similarities_ids = feat_vec\
			.loc[feat_vec[f'genre_{artist_genre}'] == 1] \
			.sort_values(by='similarity_score', ascending=False) \
			.drop_duplicates(subset='track_id') \
			.head(5) \
			['track_id']
		
		# estragge solo le statistiche della canzone
		song_stats = song_df.loc[:, ~song_df.columns.str.startswith('genre_')]
		
		# ricerco per id le canzoni interrogando spotify
		recomendations = get_recommendation(request_id, song_stats, top_similarities_ids)

		# scrivo su kafka le cnanzoni trovate + statistiche della canzone richiesta
		write_on_kafka(recomendations)

df_recommended_songs = df_reccomendation \
	.writeStream \
	.foreachBatch(reccomend_songs)\
	.start()

In [18]:
SONG_STATS_SCHEMA = StructType([
    StructField('danceability', DoubleType(), True),
    StructField('energy', DoubleType(), True),
    StructField('loudness', DoubleType(), True),
    StructField('speechiness', DoubleType(), True),
    StructField('acousticness', DoubleType(), True),
    StructField('instrumentalness', DoubleType(), True),
    StructField('liveness', DoubleType(), True),
    StructField('valence', DoubleType(), True),
    StructField('tempo', DoubleType(), True),
    StructField('popularity', DoubleType(), True),
])

RECOMMENDED_SONGS_SCHEMA = ArrayType(StructType([
        StructField('yt_link', StringType(), True),
		StructField('name', StringType(), True),
		StructField('artists', ArrayType(StringType()), True),
		StructField('album', StringType(), True)
]))

RECOMMENDATION_SCHEMA = StructType([
    StructField("Yt_Id", StringType(), True),
    StructField('Song_Stats', SONG_STATS_SCHEMA, True),
    StructField('Recommends', RECOMMENDED_SONGS_SCHEMA, True),
])

In [19]:
df_recommendation_response_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafkaServer:9092") \
    .option("subscribe", "recommendations") \
    .option("startingOffsets", "earliest") \
    .load()

df_recommendation_response = df_recommendation_response_raw \
	.selectExpr("CAST(value AS STRING)") \
	.select(from_json(col("value"), RECOMMENDATION_SCHEMA).alias("data")) \
	.select("data.*")

df_recommendation_response.printSchema()

root
 |-- Yt_Id: string (nullable = true)
 |-- Song_Stats: struct (nullable = true)
 |    |-- danceability: double (nullable = true)
 |    |-- energy: double (nullable = true)
 |    |-- loudness: double (nullable = true)
 |    |-- speechiness: double (nullable = true)
 |    |-- acousticness: double (nullable = true)
 |    |-- instrumentalness: double (nullable = true)
 |    |-- liveness: double (nullable = true)
 |    |-- valence: double (nullable = true)
 |    |-- tempo: double (nullable = true)
 |    |-- popularity: double (nullable = true)
 |-- Recommends: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- yt_link: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- artists: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- album: string (nullable = true)



# Join: Chords - Track Separator - Reccomendation System

In [20]:
df_merged = df_chords \
    .join(df_separated_tracks, on=["Yt_Id"], how="inner") \
    .join(df_recommendation_response, on=["Yt_Id"], how="inner") \
    .drop('scraped_data_raw')
df_merged.printSchema()

root
 |-- Yt_Id: string (nullable = true)
 |-- Yt_Link: string (nullable = true)
 |-- UgChords_Link: string (nullable = true)
 |-- Request_Date: timestamp (nullable = true)
 |-- scraped_data: struct (nullable = true)
 |    |-- added_favorites: integer (nullable = true)
 |    |-- artist: struct (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- profile_link: string (nullable = true)
 |    |-- author: struct (nullable = true)
 |    |    |-- profile_link: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |-- capo_position: string (nullable = true)
 |    |-- chords: string (nullable = true)
 |    |-- comments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- message: string (nullable = true)
 |    |    |    |-- date: timestamp (nullable = true)
 |    |    |    |-- upvote: integer (nullable = true)
 |    |-- difficulty: string (nulla

# Scrittura nel topic di output

In [None]:
df_merged \
	.selectExpr('cast(Yt_Id as string) as key', 'to_json(struct(*)) as value') \
	.writeStream \
    .outputMode("append") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafkaServer:9092") \
    .option("checkpointLocation", "/tmp") \
    .option("topic", "songs") \
    .start() \
    .awaitTermination()