# <center> Million Songs Analysis</center>

## Starting the Spark session

In [None]:
from pyspark.sql import SparkSession

spark_session = SparkSession\
        .builder\
        .master("spark://host-192-168-1-153-ldsa:7077")\
        .appName("Project_19")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","10m")\
        .config("spark.dynamicAllocation.initialExecutors", 1)\
        .config("spark.dynamicAllocation.minExecutors", 1)\
        .config("spark.dynamicAllocation.maxExecutors", 1)\
        .config("spark.executor.cores",1)\
        .getOrCreate()
spark_context = spark_session.sparkContext

## Importations

In [None]:
import h5py
import io, time
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id, col, when

## Loading the data into RDDs (Skip)

In [None]:
""" This function turns a list of binary elements to a string with the element separated by the character ',' """
def list_bin_to_str(list_terms) :
    string=""
    for term in list_terms :
        string += term.decode()+','
    return string[1:-1]   

In [None]:
""" We prepare first a RDD containing tuples (song_information, artist_information) """
#rdd = spark_context.binaryFiles("hdfs://host-192-168-1-153-ldsa:9000/millionsongs/data/A/B/*")
rdd = spark_context.binaryFiles("/mnt/ms/data/A/*/*")

# A function to open the h5 files one by one and take the information of interest for the song and the artist
def f(x):
    with h5py.File(io.BytesIO(x[1])) as f:
        
        f_meta = f['metadata']["songs"]
        f_analys = f['analysis']["songs"]
        f_brainz = f['musicbrainz']["songs"]
        
        artist_terms = list_bin_to_str(f['metadata']['artist_terms'][:10])
        similar_artists = list_bin_to_str(f['metadata']['similar_artists'][:10])
        
        return ((f_meta[0][-3].decode(), f_meta[0][-2].decode(),
                f_meta[0][9].decode(), f_meta[0][4].decode(),
                f_meta[0][-6].decode(),
                float(f_analys[0][3]), float(f_meta[0][-4]),
                float(f_analys[0][2]), float(f_analys[0][23]), 
                float(f_analys[0][-4]), int(f_brainz[0][1])), 
               
                (f_meta[0][4].decode(), f_meta[0][9].decode(),
                f_meta[0][6].decode(), float(f_meta[0][3]),
                artist_terms, similar_artists))

rdd = rdd.map(f)

""" We create two RDDs (songs, artists) from the general one """
rdd_songs = rdd.map(lambda x : x[0])
rdd_artists = rdd.map(lambda x : x[1])

## Turning RDDs to Data frames (Skip)

<b> *   Table Songs </b>

In [None]:
attributes = rdd_songs.map(lambda p: Row(p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10]))
fields = [StructField("song_id", StringType(), True), StructField("title", StringType(), True), 
          StructField("artist_name", StringType(), True), StructField("artist_id", StringType(), True),
          StructField("release_album", StringType(), True), 
          StructField("duration", FloatType(), True), StructField("hotness", FloatType(), True), 
          StructField("danceability", FloatType(), True), StructField("loudness", FloatType(), True), 
          StructField("tempo", FloatType(), True), StructField("year", IntegerType())]
songs_schema = StructType(fields)

df_songs = spark_session.createDataFrame(attributes, songs_schema)
df_songs.show()

<b> *   Table Artists </b>

In [None]:
attributes = rdd_artists.map(lambda p: Row(p[0], p[1], p[2], p[3], p[4], p[5]))
fields = [StructField("id", StringType(), True), StructField("name", StringType(), True), 
          StructField("location", StringType(), True), StructField("hotness", FloatType(), True),
          StructField("terms", StringType(), True), 
          StructField("similar_artists", StringType(), True)]
artists_schema = StructType(fields)

df_artists = spark_session.createDataFrame(attributes, artists_schema)\
                          .dropDuplicates()
df_artists.show()

## Saving data frames in csv files (Skip)

<b> *   Table Songs </b>

In [None]:
df_songs.write.format('com.databricks.spark.csv')\
        .option("header", "true")\
        .save('/home/ubuntu/MySongs_all.csv')


<b> *   Table Artists </b>

In [None]:
df_artists.write.format('com.databricks.spark.csv')\
        .option("header", "true")\
        .save('/home/ubuntu/MyArtists_all.csv')


In [None]:
df_songs.take(10)

## Loading data frames from csv files

<b> *   Table Songs </b>

In [None]:
fields = [StructField("song_id", StringType(), True), StructField("title", StringType(), True), 
          StructField("artist_name", StringType(), True), StructField("artist_id", StringType(), True),
          StructField("release_album", StringType(), True), 
          StructField("duration", FloatType(), True), StructField("hotness", FloatType(), True), 
          StructField("danceability", FloatType(), True), StructField("loudness", FloatType(), True), 
          StructField("tempo", FloatType(), True), StructField("year", IntegerType())]
songs_schema = StructType(fields)



df_songs = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.1.153:50070/team19/MySongs_all.csv')    

Lets take a look at the table : 

In [None]:
df_songs.printSchema()
df_songs.limit(10).show()

<b> *   Table Artists </b>

In [None]:
fields = [StructField("id", StringType(), True), StructField("name", StringType(), True), 
          StructField("location", StringType(), True), StructField("hotness", FloatType(), True),
          StructField("terms", StringType(), True), 
          StructField("similar_artists", StringType(), True)]
artists_schema = StructType(fields)

df_artists = spark_session.read.load('hdfs://192.168.1.153:50070/team19/MyArtists_all.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          schema=artists_schema)\
                          .cache()

Lets take a look at this table as well

In [None]:
df_artists.printSchema()
df_artists.limit(10).show()

# Functions

In [None]:
""" This function returns the song information in a visible way (as a string) """
def song_str(song) :
    if song["year"] != 0 :
        return "{} - {} ({})".format(song["title"], song["artist_name"], song["year"])
    else :
        return "{} - {}".format(song["title"], song["artist_name"])

In [None]:
""" This function asks the user for the title of a song and returns it from the database """
def ask_for_song() :
    right = False # Boolean to verify that the user's song is the one found in the database   
    while not right : 
        song_title = input(" Title of the song : ")
        # We look at the song in the database even if the user enters an incomplete name
        song = df_songs.filter(df_songs["title"].like("%" + song_title + "%")).first()
        while song == None :
            print(" This song doesn't exist in the database")
            song = df_songs.filter(df_songs["title"].like("%" + input("Title of the song : ") + "%")).first()
        right = (input(" Is your song {} (y/n) ? ".format(song_str(song))) == 'y')
    
    print(" The chosen song is : {} ".format(song_str(song)))
    return song

In [None]:
""" This function returns the songs which belong to the same artist as the input song(Row) """
def same_artist(song) :
    similar_songs = df_songs.filter(df_songs.artist_name == song["artist_name"])
    return similar_songs.filter(similar_songs.title != song["title"])
  

In [None]:
""" This function returns the songs, from the dataframe, which have similar tempo as the input song"""
def similar_tempo(song, dataframe, margin) : 
    similar_songs = dataframe.filter(dataframe.tempo.between(song["tempo"] - margin, song["tempo"] + margin))\
                             .filter(dataframe.title != song["title"])
 
    return similar_songs

In [None]:
""" This function returns the songs, from the dataframe, which have similar loudness as the input song """
def similar_loudness(song, dataframe, margin) : 
    similar_songs = dataframe.filter(dataframe.loudness.between(song["loudness"] - margin, song["loudness"] + margin))\
                             .filter(dataframe.title != song["title"])

    return similar_songs

In [None]:
""" This function returns the songs, from the dataframe, which have similar hotness as the input song """
def similar_hotness(song, dataframe, margin) : 
    similar_songs = dataframe.filter(dataframe.hotness.between(song["hotness"] - margin, song["hotness"] + margin))\
                             .filter(dataframe.title != song["title"])

    return similar_songs

In [None]:
""" This function returns the songs, from the dataframe, which have similar danceability as the input song """
def similar_danceability(song, dataframe, margin) : 
    similar_songs = dataframe.filter(dataframe.danceability.between(song["danceability"] - margin, song["danceability"] + margin))\
                             .filter(dataframe.title != song["title"])

    return similar_songs

In [None]:
""" This function returns the songs, from the dataframe, which are similar to the input song based on tempo, loudness,
hotness and danceability """
# number is the number of generated suggestions
# margins is a dictionary with the 4 attributes above as keys and their corresponding values are a tuple of
# (initial margin, margin_increase)
def similar_model1(song, number, margins) :
    similar_songs = spark_session.createDataFrame(spark_context.emptyRDD(), songs_schema)
    x = 0
    while similar_songs.count() < number :
        similar_songs_inter = similar_tempo(song, df_songs, margins["tempo"][1] * x + margins["tempo"][0])
        similar_songs_inter = similar_loudness(song, similar_songs_inter, margins["loudness"][1] * x + margins["loudness"][0])
        similar_songs_inter = similar_danceability(song, similar_songs_inter, margins["hotness"][1] * x + margins["hotness"][0])
        similar_songs = similar_hotness(song, similar_songs_inter, margins["danceability"][1] * x + margins["danceability"][0])
        x += 1
    return similar_songs.limit(number)

# Core Analysis

### Asking the user for a song 

In [None]:
user_song = ask_for_song()

### Songs from the same artist

In [None]:
songs_same_artist = same_artist(user_song)
if songs_same_artist.count() != 0 :
    some_songs = songs_same_artist.take(10)
    print(" Songs from the same artist :")
    for song in some_songs : 
        print(" * ", song["title"])
else :
    print(" This artist doesn't have any other songs.")
    

### Songs with similar tempo

In [None]:
margin = 10
songs_similar_tempo = similar_tempo(user_song, df_songs, margin).limit(10).collect()
if songs_similar_tempo != None :
    print("Songs with similar tempo (with a margin of {}) :".format(margin))
    for song in songs_similar_tempo : 
        print(" * ", end="")
        print(song_str(song))

### Songs with similar loudness

In [None]:
margin = 0.1
songs_similar_loudness = similar_loudness(user_song, df_songs, margin).limit(10).collect()
if songs_similar_loudness != None :
    print("Songs with similar loudness (with a margin of {}) :".format(margin))
    for song in songs_similar_loudness : 
        print(" * ", end="")
        print(song_str(song))

### Songs with similar hotness

In [None]:
margin = 0.1
songs_similar_hotness = similar_hotness(user_song, df_songs, margin).limit(10).collect()
if songs_similar_hotness != None :
    print("Songs with similar hotness (with a margin of {}) :".format(margin))
    for song in songs_similar_hotness : 
        print(" * ", end="")
        print(song_str(song))

### Songs with similar danceability

In [None]:
margin = 0.1
songs_similar_danceability = similar_danceability(user_song, df_songs, margin).limit(10).collect()
if songs_similar_danceability != None :
    print("Songs with similar danceability (with a margin of {}) :".format(margin))
    for song in songs_similar_danceability : 
        print(" * ", end="")
        print(song_str(song))

### Songs with similar tempo, loudness, hotness and danceability

In [None]:
margins = {"tempo" : (1, 0.1), "loudness" : (1, 0.2), "hotness" : (1, 0.3), "danceability" : (0.01, 0.02)}

To make things clearer, lets plot first the margins,  which are going to be passed as an argument to the function below, as a function of the number of iterations :

In [None]:
X_iterations = range(20)
Y_margins = [[], [], [], []]

list_margins = [(1,0.1), (1,0.2), (1,0.3), (0.01,0.02)]
for i in range(20) :
    for j in range(4) :
        Y_margins[j].append(list_margins[j][0] + i * list_margins[j][1])
        
labels = ["tempo", "loudness", "hotness", "danceability"]
for i in range(4) :
    plt.plot(X_iterations, Y_margins[i], label = labels[i])
    
plt.legend()
plt.show()

In [None]:
songs_similar_model1 = similar_model1(user_song, 10, margins)
print("Similar songs following the model 1:".format(margin))
for song in songs_similar_model1.collect() : 
    print(" * ", end="")
    print(song_str(song))

In [None]:
df_artists.show()

In [None]:
songs_similar_model1[0]["title"]

In [None]:
terms = songs_similar_model1.join(df_artists, df_songs.artist_id == df_artists.id)\
        .select(df_artists.location, df_artists.hotness, df_artists.terms)\
        .collect()


In [None]:
terms

## Releasing the resources

In [None]:
spark_context.stop()