# Preprocessing 

### Initializing findspark and Reading In Data

In [1]:
import findspark
findspark.init("/usr/local/spark/")
from pyspark.sql import SparkSession
import collections
import pyspark.sql.functions as F # useful for preprocessing columns
import shutil # used in removing folders when writing out our CSV files
import os # used in renaming files when writing out our CSV files

spark = SparkSession.builder \
   .master("local[8]") \
   .appName("RatingsHistogram") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
sc = spark.sparkContext

In [2]:
try:
    billboard = spark.read.format("csv").option("header", "true").option("escapeQuotes", "true").load("./data/charts.csv")
    spotify = spark.read.format("csv").option("header", "true").option("escapeQuotes", "true").load("./data/spotify.csv")
except Exception as e:
    print("Input files not found. Please check that the Files are in the correct Folder")

After reading in the data, we also create a function called `output`, which is based on the `write.csv` function of Spark. It is used to automatically merge the partitions and place the created CSV output into the folder where the Jupyter Notebook is. 

In [3]:
def output(outdata, outname):
    df = outdata.coalesce(1)
    temp_dir = "./out"
    output_file = outname + ".csv"
    df.write.csv(temp_dir, header=True)
    temp_file_path = os.path.join(temp_dir, os.listdir(temp_dir)[0])
    os.rename(temp_file_path, output_file)
    shutil.rmtree(temp_dir)

### Joining Billboard and Spotify Data Sets

To reach our goal of predicting whether a song will make it onto the Billboard Hot100 or not, we first need to join our two data sets together. For this,  **we need to change the Billboard data set's formatting** to match with the Spotify data set:

In [4]:
# Formating to make artist names be seperated by ; 
billboard2 = billboard.select("song", "artist")
billboard2 = billboard2.withColumn('artist', F.regexp_replace('artist', ' Featuring ', ';'))
billboard2 = billboard2.withColumn('artist', F.regexp_replace('artist', ' & ', ';'))
billboard2 = billboard2.withColumn('artist', F.regexp_replace('artist', ' x ', ';'))
billboard2 = billboard2.withColumn('artist', F.regexp_replace('artist', ' X ', ';'))
billboard2 = billboard2.selectExpr("song as track_name", "artist as artists")
billboard2.show(5)

+-------------+--------------------+
|   track_name|             artists|
+-------------+--------------------+
|   Easy On Me|               Adele|
|         Stay|The Kid LAROI;Jus...|
|Industry Baby|Lil Nas X;Jack Ha...|
|   Fancy Like|        Walker Hayes|
|   Bad Habits|          Ed Sheeran|
+-------------+--------------------+
only showing top 5 rows



Afterwards, we <ins>create a subset</ins> for the songs that are on the Billboard Hot100 (`result1`) and those that aren't (`result2`). We add the fitting value in the new column (`billboard`) for each subset and then join them back together. 

In [5]:
result1 = spotify.join(billboard2, ['track_name','artists'], 'semi').withColumn('billboard', F.lit(True))
result2 = spotify.join(billboard2, ['track_name','artists'], 'anti').withColumn('billboard', F.lit(False))
result = result1.unionAll(result2)
result.show(2)

+-------------+--------------------+---+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+---------+
|   track_name|             artists|_c0|            track_id|          album_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|billboard|
+-------------+--------------------+---+--------------------+--------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+---------+
|Say Something|A Great Big World...|  6|6Vc5wAMmXdKIAM7WU...|Is There Anybody ...|        74|     229400|   False|       0.407| 0.147|  2|  -8.822|   1|     0.0355|       0.857|        2.89e-06|  0.0913| 0.0765|141.284|          

Next up, we'll create our output to `result.csv`. For this, we use our previously created function `output`. However, this is only an intermediate file, and we only use it to split it and call the Musixmatch API on it. (We commented out the output as to not overwrite the already existing file.)

In [6]:
dedup = result.dropDuplicates(['track_name', 'artists'])
dedup = dedup.where(dedup._c0 != 80832) # dropping one specific entry with faulty formatting

# output(dedup, "result")

### Preparing Data for MusixMatch API

Now that we have our joined data set, we need to **use the MusixMatch API to get the lyrics to our songs**. Unfortunately, this API comes with limitations of being able to only call for 2000 songs per day per key. To speed up the process, we split the data into 4 parts for each team member, so that we can join it back later when we have the lyrics. 

In [8]:
# Create four csv files with 1/4 of the songs (around 20.000) 
# so that we can split up the API calling process  

from pyspark.sql.functions import lit,row_number,col
from pyspark.sql.window import Window

w = Window().partitionBy(lit('a')).orderBy(lit('a'))
df = dedup.withColumn("row_num", row_number().over(w))
splitting = [1, 2, 3, 4]
num = round(dedup.count() / 4)

for i in splitting:
    begin = (i - 1) * num 
    end = i * num - 1
    dfnew = df.filter(col('row_num').between(begin,end))
    dfnew.drop('row_num')
    dfnew = dfnew.withColumnRenamed('_c0', 'id')
    dfnew.toPandas().to_csv(f'./splits/split{i}.csv')

### Calling the MusixMatch API

The following code is the same one that each one of our team members used to **get the lyrics** to their split of the data set. This is *just for demonstration purposes* given that each of us had to modify this each day for the appropriate song IDs and API keys. 

## Setting up the API

Important to note that we have **removed our API keys and client secrets** for security purposes. Please reach out to our team in case you would want to try out this part of code, or alternatively, input your Spotify and Musixmatch API keys.

In [9]:
import requests
import base64
import json
import csv

# Spotify API Client ID and Secret
# Note: we removed the keys for security purposes, if you want to try out the API calls, please reach out to our group
client_id = 'SPOTIFY CLIENT ID HERE'
client_secret = 'SPOTIFY SECRET HERE'

# Encode client_id and client_secret
auth_str = f"{client_id}:{client_secret}"
auth_bytes = auth_str.encode('utf-8')
auth_base64 = base64.b64encode(auth_bytes).decode('utf-8')

# Set up the request headers and body
headers = {
    'Authorization': f'Basic {auth_base64}'
}

data = {
    'grant_type': 'client_credentials'
}

# Make the POST request to the Spotify API token endpoint
response = requests.post('https://accounts.spotify.com/api/token', headers=headers, data=data)

# Now we can make the request, if response code is "200" it means success and then we can get the access token
# The access token is valid for ONE HOUR, after that we need a new token
if response.status_code == 200:
    access_token = response.json().get('access_token')
    print(f"Access Token: {access_token}")
else:
    print(f"Failed to retrieve access token: {response.status_code}")
    print(response.json())
    
# Musixmatch API
api_key = 'MUSIXMATCH API KEY HERE'

# Define the endpoint and parameters for the API request
base_url = 'https://api.musixmatch.com/ws/1.1/'
track_search_endpoint = 'track.search'
lyrics_get_endpoint = 'track.lyrics.get'

Access Token: BQDKI7Utobbx6k47jJwhrmHTe52TdjS8_P6wYcxalCZ6VhTU-3t9duzFG2d8wXlubmxbpjcL2Xta6iwKYOSk786FjlW0MjJRcnZFOdj2Msf6dzmf7E0


## Function to search for a track on the Musixmatch API

After setting up the APIs, we **created a function** which **returns the song's Spotify ID** given the track name and the artist. This will be the input to our next function which delivers the lyrics. 

In [10]:
def search_track(track_name, artist_name):
    search_url = base_url + track_search_endpoint
    params = {
        'q_track': track_name,
        'q_artist': artist_name,
        'apikey': api_key,
        's_track_rating': 'desc',
        'f_has_lyrics': 'true'
    }
    response = requests.get(search_url, params=params)
    if response.status_code == 200:
        try:
            result = response.json()
            message = result.get('message')
            if message:
                body = message.get('body')
                if body:
                    track_list = body.get('track_list')
                    if track_list:
                        return track_list[0].get('track').get('track_id')
            return "No track found."
        except Exception as e:
            print(f"Error parsing response: {e}")
            print(response.text)
            return "No track found."
    else:
        print(f"Failed to search track: {response.status_code}")
        print(response.json())
        return "No track found."

## Function to get lyrics for a track on the Musixmatch API

Afterwards, we created our function `get_lyrics` which fetches the lyrics of the song based on the track ID from the Musixmatch API. It returns "No lyrics found" in case of a song not found. (However, it is important to note that the free version of the Musixmatch API only returns 30% of the lyrics for each song.)

In [11]:
def get_lyrics(track_id):
    lyrics_url = base_url + lyrics_get_endpoint
    params = {
        'track_id': track_id,
        'apikey': api_key
    }
    response = requests.get(lyrics_url, params=params)
    if response.status_code == 200:
        try:
            result = response.json()
            message = result.get('message')
            if message:
                body = message.get('body')
                if body:
                    lyrics = body.get('lyrics')
                    if lyrics:
                        lyrics_body = lyrics.get('lyrics_body')
                        lyrics_copyright = lyrics.get('lyrics_copyright', 'No lyrics found')
                        return lyrics_body, lyrics_copyright
            return "No lyrics found.", "No lyrics found"
        except Exception as e:
            print(f"Error parsing response: {e}")
            print(response.text)
            return "No lyrics found.", "No lyrics found"
    else:
        print(f"Failed to get lyrics: {response.status_code}")
        print(response.json())
        return "No lyrics found.", "No lyrics found"

**To demonstrate**, we'll load the first 10 rows of the first split and show that it indeed gets the lyrics. 

In [12]:
file = spark.read.format("csv").option("header", "true").option("escapeQuotes", "false").load("./splits/split1.csv")
from pyspark.sql.functions import lit,row_number,col
subset = file.filter(col('_c0').between(0, 10))
subset.show(n = 1)

+---+--------------------+-------+-----+--------------------+------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+---------+-------+
|_c0|          track_name|artists|   id|            track_id|        album_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|track_genre|billboard|row_num|
+---+--------------------+-------+-----+--------------------+------------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+-----------+---------+-------+
|  0|"""Call Me - Sing...|Blondie|82022|3x0lg7bRDP2LCn65J...|80s Bangers Vol. 1|         6|     212706|   False|       0.557| 0.832|  2|  -6.708|   0|     0.0325|    0.000868|         0.00128|  0.0975|   0.75|14

Next, we defined **a new function so that we can use RDD mapping** on the subset.

In [13]:
def customFunction(row):
    track_id = search_track(row.track_name, row.artists)
    lyrics, lyrics_copyright = get_lyrics(track_id)
    return row + (lyrics,)
subset2 = subset.rdd.map(customFunction)

from pyspark.sql import SQLContext, SparkSession
from pyspark.sql import Row
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(subset2)

In [14]:
df.show(n = 1, truncate = 0)

+---+-------------------------------------------------------------------+-------+-----+----------------------+------------------+---+------+-----+-----+-----+---+------+---+------+--------+-------+------+----+-------+---+---------+-----+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_1 |_2                                                                 |_3     |_4   |_5                    |_6                |_7 |_8    |_9   |_10  |_11  |_12|_13   |_14|_15   |_16     |_17    |_18   |_19 |_20    |_21|_22      |_23  |_24|_25                                                                                                                       

## Merging Files 

After obtaining our lyrics, we **collected all of the split up data sets** into our `merge` folder. (<ins>Note:</ins> we also included additional data in that folder. That comes from using the Billboard data set and running the Spotify API on it to obtain song information, so that we have more Billboard songs in our data, making for a bigger sample size when creating a balanced data set. That chunk of data also needed preprocessing, which can be seen in the `merge_billboard` folder.)

Thus, our goal is to **merge all of these individual files** together into one data frame. 

In [17]:
file_paths = []
for root, dirs, files in os.walk(os.path.abspath("./merge")):
    for file in files:
        file_paths.append(os.path.join(root, file))

In [18]:
combined_df = spark.read.csv(
    file_paths[0],
    header=True,  
    quote='"',   
    escape='"',   
    multiLine=True, 
    inferSchema=True
)

To merge the files, we had to **recast some variables** accordingly. This is important especially for faulty observations.

In [19]:
combined_df = combined_df.withColumn("9", combined_df["9"].cast("string"))
combined_df = combined_df.withColumn("23", combined_df["23"].cast("string"))
combined_df = combined_df.withColumn("14", combined_df["14"].cast("integer"))
combined_df = combined_df.drop("_c0", "_1")

In [20]:
# Loop through the remaining file paths and union each one
for file_path in file_paths[1:]:
    temp_df = spark.read.csv(
        file_path,
        header=True,  # Assuming the first row is a header
        quote='"',    # Use double quotes to handle fields containing commas
        escape='"',   # Escape double quotes with another double quote
        multiLine=True,  # Enable if fields span multiple lines
        inferSchema=True  # Infers the schema of the CSV file
    )
    try:
        try: 
            temp_df = temp_df.drop("_c0", "_1")
        except:
            pass
        temp_df = (temp_df.withColumn("_2", temp_df[0].cast("string"))
                            .withColumn("_3", temp_df[1].cast("string"))
                            .withColumn("_4", temp_df[2].cast("integer"))
                            .withColumn("_5", temp_df[3].cast("string"))
                            .withColumn("_6", temp_df[4].cast("string"))
                            .withColumn("_7", temp_df[5].cast("integer"))
                            .withColumn("_8", temp_df[6].cast("integer"))
                            .withColumn("_9", temp_df[7].cast("string"))
                            .withColumn("_10", temp_df[8].cast("double"))
                            .withColumn("_11", temp_df[9].cast("double"))
                            .withColumn("_12", temp_df[10].cast("integer"))
                            .withColumn("_13", temp_df[11].cast("double"))
                            .withColumn("_14", temp_df[12].cast("integer"))
                            .withColumn("_15", temp_df[13].cast("double"))
                            .withColumn("_16", temp_df[14].cast("double"))
                            .withColumn("_17", temp_df[15].cast("double"))
                            .withColumn("_18", temp_df[16].cast("double"))
                            .withColumn("_19", temp_df[17].cast("double"))
                            .withColumn("_20", temp_df[18].cast("double"))
                            .withColumn("_21", temp_df[19].cast("integer"))
                            .withColumn("_22", temp_df[20].cast("string"))
                            .withColumn("_23", temp_df[21].cast("string"))
                            .withColumn("_24", temp_df[22].cast("integer"))
                            .withColumn("_25", temp_df[23].cast("string")))
        temp_df = temp_df.select("_2", "_3", "_4", "_5", "_6", "_7", "_8", "_9", "_10", "_11", "_12", "_13", "_14", "_15", "_16", "_17", "_18", "_19", "_20", "_21", "_22", "_23", "_24", "_25")
        combined_df = combined_df.union(temp_df)
    except:
        print(file_path, "didn't work\n", temp_df.dtypes)

Afterwards, we'll **rename the columns** and **drop duplicates**. We also cleaned up some columns by dropping observations with NAs and making sure our binary variables don't contain any other strings. 

In [21]:
combined_df2 = combined_df.drop("4", "24")

df = (combined_df2
              .withColumnRenamed("_2", "track_name")
              .withColumnRenamed("_3", "artist")
              .withColumnRenamed("5", "track_id")
              .withColumnRenamed("6", "album_name")
              .withColumnRenamed("7", "popularity")
              .withColumnRenamed("8", "duration_ms")
              .withColumnRenamed("9", "explicit")
              .withColumnRenamed("10", "danceability")
              .withColumnRenamed("11", "energy")
              .withColumnRenamed("12", "key")
              .withColumnRenamed("13", "loudness")
              .withColumnRenamed("14", "mode")
              .withColumnRenamed("15", "speechiness")
              .withColumnRenamed("16", "acousticness")
              .withColumnRenamed("17", "instrumentalness")
              .withColumnRenamed("18", "liveness")
              .withColumnRenamed("19", "valence")
              .withColumnRenamed("20", "tempo")
              .withColumnRenamed("21", "time_signature")
              .withColumnRenamed("22", "track_genre")
              .withColumnRenamed("23", "billboard")
              .withColumnRenamed("25", "lyrics"))
df = df.filter(df.lyrics != "No lyrics found.")

In [22]:
dedup = df.dropDuplicates(['track_name', 'artist'])

from pyspark.sql.functions import regexp_replace, col
dedup2 = dedup.filter((dedup.billboard == True) | (dedup.billboard == False))  
dedup2 = dedup.filter((dedup.explicit == True) | (dedup.explicit == False))
dedup3 = dedup2.filter(dedup2.loudness.isNotNull())

Afterwards, we wanted to check whether the Billboard column (which will be our output variable) has correct entries. 

In [23]:
# Counting each value in the `billboard` column
dedup3.groupBy("billboard").count().show()

+---------+-----+
|billboard|count|
+---------+-----+
|    False|13295|
|    false|39544|
|     True| 2348|
|     true| 6575|
+---------+-----+



As we can see, we need to **standardize this column.**

In [24]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import when

In [25]:
try:
    # Standardizing the "billboard" column
    dedup4 = dedup3.withColumn("billboard", when(col("billboard").isin("True", "true"), "True").otherwise("False"))
    # Using StringIndexer to turn billboard to a numerical column
    indexer = StringIndexer(inputCol = "billboard", outputCol = "billboard_numeric", handleInvalid = "skip")
    dedup4 = indexer.fit(dedup4).transform(dedup4)
    
except AnalysisException as e:
    print("Error when processing billboard column transformation:", e)
    
# Show the number of entries for each value in the new "billboard_numeric" column after the conversion
dedup4.groupBy("billboard_numeric").count().show()

+-----------------+-----+
|billboard_numeric|count|
+-----------------+-----+
|              0.0|52839|
|              1.0| 8923|
+-----------------+-----+



In [26]:
dedup4 = dedup4.drop("billboard")
dedup4 = dedup4.withColumnRenamed("billboard_numeric", "billboard")
dedup4.groupBy("billboard").count().show()

+---------+-----+
|billboard|count|
+---------+-----+
|      0.0|52839|
|      1.0| 8923|
+---------+-----+



Next up, we just cleaned up the `lyrics` column and created our output into `final.csv`. (Note: we used the Pandas in this part of the code, because the Spark output struggled with the many escape quotes and commas of the lyrics column. The Pandas version is also significantly faster than it.)

In [27]:
dedup4 = dedup4.withColumn('lyrics', F.regexp_replace('lyrics', r'\*', ''))
dedup4 = dedup4.withColumn('lyrics', F.regexp_replace('lyrics', 'This Lyrics is NOT for Commercial use', ''))

In [28]:
dedup4.toPandas().to_csv("final.csv")