In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below

def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.sql.warehouse.dir,file:/users/home/uwi14/Uditha/Assignment2/spark-warehouse
spark.driver.memory,4g
spark.driver.port,46645
spark.executor.memory,4g
spark.app.id,app-20240528201721-0312
spark.master,spark://masternode2:7077
spark.executor.id,driver
spark.executor.cores,2


In [3]:
# Write your imports and code here or insert cells below

from pyspark.sql import functions as F
from pyspark.sql.types import *

In [4]:
# Determine ideal number of partitions

conf = sc.getConf()

N = int(conf.get("spark.executor.instances"))
M = int(conf.get("spark.executor.cores"))
partitions = 4 * N * M

print(f'ideal # partitions = {partitions}')

ideal # partitions = 32


# Q2 - A

### (a) Some of the tracks in the Million Song Dataset were matched to the wrong songs in the Taste Profile dataset and there is a list of song-track mismatches that were automatically identified and another list of mismatches that were manually accepted. Do we need to take any action to account for this based on the datasets that we are using in the sections below and how we are using them? Read the blog post mentioned above and think about if you need to match the datasets at all.

#### According to the Million Song's blog (Bertin-Mahieux, 2011), some tracks in the Million Song Dataset were matched to the wrong songs in the Taste Profile dataset. In the second part of the assignment, audio similarity was examined using one of the feature datasets and all music genre datasets. In the third part of the assignment, songs were recommended using the Taste Profile dataset. Therefore, in the second part, only track IDs were dealt with, and in the third part, only song IDs were addressed. Consequently, no action was needed regarding the mismatched songs, as song IDs and track IDs did not overlap throughout the assignment.

### This is an example of removing the mismatched songs

In [5]:

#firstly, load the taste profile dataset
schema_triplets = StructType([
    StructField("user_id", StringType(), True),
    StructField("song_id",StringType(), True),
    StructField("play_count", IntegerType(), True)
])

# Load triplets data from HDFS
triplets = spark.read.csv("/data/msd/tasteprofile/triplets.tsv/", sep = '\t', schema = schema_triplets).repartition(partitions)
triplets.cache()

print('\n')
print(f'**Conclusion: The raw triplets table has {triplets.count()} rows.')
show_as_html(triplets, 5 )



**Conclusion: The raw triplets table has 48373586 rows.


Unnamed: 0,user_id,song_id,play_count
0,afd3cb5fabe9685212317d5adf380ad40b29352c,SOMJIRK12A8C13D68D,1
1,49f6982f00e7b286cf068e03c04e97e909957841,SOIYWPZ12A81C204EF,1
2,e5a2ac098410c040ba03255fcc5c6a63a51c88e3,SOORDDG12A8AE461D0,1
3,2fab7f9fd56fdcf1131b50db78ac0eccf7fb8dab,SOXDLUF12AC4687779,9
4,85e83dd0e4158b49028882642a536246205624d9,SOJTLHS12A8C13F633,4


In [6]:
#load the mismatched information
sid_mismatches = spark.read.text("hdfs:///data/msd/tasteprofile/mismatches/sid_mismatches.txt").repartition(partitions)
sid_mismatches.head(5)

[Row(value='ERROR: <SOSHAHT12A8AE498DF TRGHCMN128F42422EB> Gene Kelly  -  Heavenly Music  !=  Summer Stock feat. MGM Studio Orchestra  -  Little Brown Jug [In The Good Old Summertime_ 1949]'),
 Row(value="ERROR: <SOTVIGC12CF5F87EFA TRGLNBL128F9324F09>   -    !=  Lyfe Jennings  -  Let's Stay Together"),
 Row(value='ERROR: <SOMIVVH12A58A7A4F3 TRBAYLZ128F933841B>   -    !=  Daryl Hall & John Oates  -  You Make My Dreams'),
 Row(value='ERROR: <SOLPMEO12A8C136EC2 TRBWLFD128F4261A6A>   -    !=  Girl Talk  -  Friday Night'),
 Row(value='ERROR: <SOCVMRY12A8AE486A6 TRGEDPP128F428AE71> Eurythmics  -  When Tomorrow Comes  !=  USAF Concert Band and Singing Sergeants  -  Variations on a West Country Carol')]

In [7]:
sid_mismatches = (sid_mismatches
                 .select(
                     F.substring(F.col('value'), 9,18).alias('song_id'),
                     F.substring(F.col('value'), 28,18).alias('track_id') ,
                     ))


sid_mismatches.printSchema()
show_as_html(sid_mismatches, 5)

print('\n')
print(f'The mismatches table has {sid_mismatches.count()} rows.')

root
 |-- song_id: string (nullable = true)
 |-- track_id: string (nullable = true)



Unnamed: 0,song_id,track_id
0,SOSHAHT12A8AE498DF,TRGHCMN128F42422EB
1,SOTVIGC12CF5F87EFA,TRGLNBL128F9324F09
2,SOMIVVH12A58A7A4F3,TRBAYLZ128F933841B
3,SOLPMEO12A8C136EC2,TRBWLFD128F4261A6A
4,SOCVMRY12A8AE486A6,TRGEDPP128F428AE71




The mismatches table has 19094 rows.


In [9]:
sid_matches_manually_accepted = spark.read.text('/data/msd/tasteprofile/mismatches/sid_matches_manually_accepted.txt')
sid_matches_manually_accepted.head(5)

[Row(value='9d8'),
 Row(value='< ERROR: <SOFQHZM12A8C142342 TRMWMFG128F92FFEF2> Josipa Lisac  -  razloga  !=  Lisac Josipa  -  1000 razloga'),
 Row(value='19d17'),
 Row(value='< ERROR: <SODXUTF12AB018A3DA TRMWPCD12903CCE5ED> Lutan Fyah  -  Nuh Matter the Crisis Feat. Midnite  !=  Midnite  -  Nah Matter the Crisis'),
 Row(value='29d26')]

In [10]:
sid_matches_manually_accepted.createOrReplaceTempView('sid_matches_manually_accepted_v')

sid_matches_manually_accepted = spark.sql(f"""select substr(value,11,18) as song_id
,substr(value,30,18) as track_id
from sid_matches_manually_accepted_v  
where value like '%ERROR%'          
          """) 

In [12]:
sid_matches_manually_accepted.show(5)
print('\n')
print(f'The manually accepted mismatches {sid_matches_manually_accepted.count()} ')

+------------------+------------------+
|           song_id|          track_id|
+------------------+------------------+
|SOFQHZM12A8C142342|TRMWMFG128F92FFEF2|
|SODXUTF12AB018A3DA|TRMWPCD12903CCE5ED|
|SOASCRF12A8C1372E6|TRMHIPJ128F426A2E2|
|SOITDUN12A58A7AACA|TRMHXGK128F42446AB|
|SOLZXUM12AB018BE39|TRMRSOF12903CCF516|
+------------------+------------------+
only showing top 5 rows



The manually accepted mismatches 489 


In [15]:
#deal with one situation: some matches are mistaken but manually accepted
sid_mismatches_outlier = sid_mismatches.join(
  sid_matches_manually_accepted,
  on = 'song_id',
  how="leftanti"
)

print(f'The raw count of mismatched song items: {sid_mismatches_outlier.count()}')
print(f'The new count of mismatched song items: {sid_mismatches.count()}')
print(f'There are {sid_mismatches.count() -sid_mismatches_outlier.count()} song items deleted.')

The raw count of mismatched song items: 19093
The new count of mismatched song items: 19094
There are 1 song items deleted.


In [18]:
triplets_new = triplets.join(
  sid_mismatches_outlier,
  on = 'song_id',
  how="leftanti"
)
print(f'The new row count of triplets: {triplets_2_removed.count()}.')
print(f'After {triplets.count() -triplets_new.count()} rows are deleted from Taste Profile Dataset.')
show_as_html(triplets_new, 5)

The new row count of triplets: 45795111.
After 2578475 rows are deleted from Taste Profile Dataset.


Unnamed: 0,song_id,user_id,play_count
0,SOMJIRK12A8C13D68D,afd3cb5fabe9685212317d5adf380ad40b29352c,1
1,SOIYWPZ12A81C204EF,49f6982f00e7b286cf068e03c04e97e909957841,1
2,SOORDDG12A8AE461D0,e5a2ac098410c040ba03255fcc5c6a63a51c88e3,1
3,SOXDLUF12AC4687779,2fab7f9fd56fdcf1131b50db78ac0eccf7fb8dab,9
4,SOJTLHS12A8C13F633,85e83dd0e4158b49028882642a536246205624d9,4


# Q2 - B

In [20]:
file_names = [
    "msd-jmir-area-of-moments-all-v1.0",
    "msd-jmir-lpc-all-v1.0",
    "msd-jmir-methods-of-moments-all-v1.0",
    "msd-jmir-mfcc-all-v1.0",
    "msd-jmir-spectral-all-all-v1.0",
    "msd-jmir-spectral-derivatives-all-all-v1.0",
    "msd-marsyas-timbral-v1.0",
    "msd-mvd-v1.0",
    "msd-rh-v1.0",
    "msd-rp-v1.0",
    "msd-ssd-v1.0",
    "msd-trh-v1.0",
    "msd-tssd-v1.0"
]

type_column_dic = {
    "real":DoubleType(),
    "NUMERIC":DoubleType(),
    "float":DoubleType(),
    "string":StringType(),
    "STRING":StringType()    
}

datasets = []

for name in file_names:    
    path_attributes = f'/data/msd/audio/attributes/{name}.attributes.csv'
    path_feature = f'/data/msd/audio/features/{name}.csv'
    
    attributes = spark.read.csv(path_attributes).toPandas()
        
    schema = StructType([
        StructField(column_name, type_column_dic[type_column_1], True) for i, (column_name, type_column_1) in attributes.iterrows()
    ])
    
    datasets.append((name, path_feature, schema))

In [21]:
#test one load
name, path_feature, schema = datasets[0]
id_name = schema[-1].name

# Load data    
data_0 = spark.read.csv(path_feature, schema = schema).repartition(partitions)
        
data_0 = data_0.withColumn(id_name, F.regexp_replace(F.col(id_name), "'", ""))

# Display
print(name)
print(data_0.count())
data_0.printSchema()
print(data_0.head())
show_as_html(data_0, 5)

msd-jmir-area-of-moments-all-v1.0
994623
root
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_1: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_2: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_3: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_4: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_5: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_6: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_7: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_8: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_9: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Standard_Deviation_10: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Average_1: double (nullable = true)
 |-- Area_Method_of_Moments_Overall_Average_2: doub

Unnamed: 0,Area_Method_of_Moments_Overall_Standard_Deviation_1,Area_Method_of_Moments_Overall_Standard_Deviation_2,Area_Method_of_Moments_Overall_Standard_Deviation_3,Area_Method_of_Moments_Overall_Standard_Deviation_4,Area_Method_of_Moments_Overall_Standard_Deviation_5,Area_Method_of_Moments_Overall_Standard_Deviation_6,Area_Method_of_Moments_Overall_Standard_Deviation_7,Area_Method_of_Moments_Overall_Standard_Deviation_8,Area_Method_of_Moments_Overall_Standard_Deviation_9,Area_Method_of_Moments_Overall_Standard_Deviation_10,...,Area_Method_of_Moments_Overall_Average_2,Area_Method_of_Moments_Overall_Average_3,Area_Method_of_Moments_Overall_Average_4,Area_Method_of_Moments_Overall_Average_5,Area_Method_of_Moments_Overall_Average_6,Area_Method_of_Moments_Overall_Average_7,Area_Method_of_Moments_Overall_Average_8,Area_Method_of_Moments_Overall_Average_9,Area_Method_of_Moments_Overall_Average_10,MSD_TRACKID
0,0.06341,3352.0,7133.0,39930000.0,87640000.0,192000000.0,877700000000.0,784000000.0,1720000000.0,9373000000000.0,...,5765.0,13010.0,-44430000.0,-98870000.0,-219500000.0,771100000000.0,882100000.0,1963000000.0,8358000000000.0,TRVPMCC128F931FC22
1,1.018,3357.0,19800.0,39900000.0,235000000.0,1383000000.0,876200000000.0,2105000000.0,12400000000.0,177900000000000.0,...,5747.0,34040.0,-44260000.0,-262000000.0,-1548000000.0,767600000000.0,2338000000.0,13840000000.0,158300000000000.0,TRDGVLU128F14872A8
2,0.1557,3357.0,4951.0,40050000.0,58890000.0,86430000.0,881900000000.0,528200000.0,776500000.0,2793000000000.0,...,5774.0,8473.0,-44560000.0,-65490000.0,-95970000.0,774400000000.0,586000000.0,861200000.0,2451000000000.0,TRDFROT128F9333EA6
3,0.8018,3354.0,27210.0,39980000.0,328400000.0,2697000000.0,879600000000.0,2937000000.0,24140000000.0,490800000000000.0,...,5770.0,46060.0,-44510000.0,-356600000.0,-2856000000.0,772900000000.0,3184000000.0,25540000000.0,404800000000000.0,TRVJNAV128F4245695
4,1.026,6738.0,43210.0,161900000.0,1051000000.0,6819000000.0,7173000000000.0,9478000000.0,61510000000.0,1969000000000000.0,...,11640.0,78340.0,-180800000.0,-1203000000.0,-7996000000.0,6321000000000.0,10840000000.0,72130000000.0,1823000000000000.0,TRSJKZR128F92C4F9D


In [22]:
#test one load
name, path_feature, schema = datasets[9]
id_name = schema[-1].name

# Load data    
data_0 = spark.read.csv(path_feature, schema = schema).repartition(partitions)
        
data_0 = data_0.withColumn(id_name, F.regexp_replace(F.col(id_name), "'", ""))

# Display
print(name)
print(data_0.count())
data_0.printSchema()
print(data_0.head())
show_as_html(data_0, 5)

msd-rp-v1.0
994188
root
 |-- component_1: double (nullable = true)
 |-- component_2: double (nullable = true)
 |-- component_3: double (nullable = true)
 |-- component_4: double (nullable = true)
 |-- component_5: double (nullable = true)
 |-- component_6: double (nullable = true)
 |-- component_7: double (nullable = true)
 |-- component_8: double (nullable = true)
 |-- component_9: double (nullable = true)
 |-- component_10: double (nullable = true)
 |-- component_11: double (nullable = true)
 |-- component_12: double (nullable = true)
 |-- component_13: double (nullable = true)
 |-- component_14: double (nullable = true)
 |-- component_15: double (nullable = true)
 |-- component_16: double (nullable = true)
 |-- component_17: double (nullable = true)
 |-- component_18: double (nullable = true)
 |-- component_19: double (nullable = true)
 |-- component_20: double (nullable = true)
 |-- component_21: double (nullable = true)
 |-- component_22: double (nullable = true)
 |-- component_23

Row(component_1=0.000732, component_2=0.004374, component_3=0.010033, component_4=0.021959, component_5=0.021077, component_6=0.021095, component_7=0.010749, component_8=0.0085, component_9=0.005567, component_10=0.011657, component_11=0.013513, component_12=0.017738, component_13=0.008703, component_14=0.010784, component_15=0.016256, component_16=0.026867, component_17=0.016416, component_18=0.022224, component_19=0.006332, component_20=0.006577, component_21=0.003075, component_22=0.003139, component_23=0.003051, component_24=0.0, component_25=0.013235, component_26=0.023563, component_27=0.026978, component_28=0.054015, component_29=0.013446, component_30=0.052847, component_31=0.049181, component_32=0.026644, component_33=0.018725, component_34=0.023462, component_35=0.076165, component_36=0.041573, component_37=0.027638, component_38=0.018374, component_39=0.015748, component_40=0.032476, component_41=0.028952, component_42=0.032233, component_43=0.00894, component_44=0.016605, c

Unnamed: 0,component_1,component_2,component_3,component_4,component_5,component_6,component_7,component_8,component_9,component_10,...,component_1432,component_1433,component_1434,component_1435,component_1436,component_1437,component_1438,component_1439,component_1440,instanceName
0,0.01282,0.025459,0.030069,0.014544,0.028225,0.041932,0.031324,0.027404,0.026318,0.013191,...,0.024848,0.047171,0.059207,0.021858,0.026521,0.020713,0.020222,0.01101,0.003796,TRLIWTJ128F428C2B2
1,0.016309,0.00806,0.031449,0.017469,0.012181,0.017402,0.017052,0.013064,0.010149,0.009149,...,0.081111,0.056194,0.080337,0.05489,0.038141,0.033667,0.024478,0.009277,1e-06,TRYGEZW128F147DB6A
2,0.042403,0.024452,0.014649,0.011565,0.013444,0.013876,0.003947,0.004569,0.00674,0.00483,...,0.013483,0.014493,0.013235,0.014287,0.012159,0.008529,0.011702,0.005376,1e-06,TRBDKAN12903CBF568
3,0.004917,0.02596,0.006874,0.025598,0.00781,0.007205,0.006379,0.00937,0.011978,0.00584,...,0.00438,0.003503,0.002986,0.001669,0.001513,0.000916,0.000678,8e-06,1e-06,TRWCGAH128F428E276
4,0.025119,0.032041,0.024421,0.011334,0.026388,0.022804,0.026802,0.019875,0.046378,0.034768,...,0.059956,0.055016,0.047135,0.047023,0.038177,0.037378,0.034569,0.014963,1e-06,TRMCQJG128F42A2716


In [23]:
for name in file_names:

    metadata_schema = StructType([
        StructField("name", StringType()),
        StructField("type", StringType()),
    ])
    metadata = spark.read.csv(f'/data/msd/audio/attributes/{name}.attributes.csv', schema=metadata_schema)

    schema_simple = StructType([
        StructField(f"F{i:03d}", DoubleType(), True) for i in range(0, metadata.count() - 1)
    ] + [
        StructField(f"ID", StringType(), True)
    ])

    data_simple = spark.read.csv(f'/data/msd/audio/features/{name}.csv', schema=schema_simple, quote="'")
    
    print(f"{name:42s} = ({data_simple.count():d}, {len(data_simple.columns) - 1})")

msd-jmir-area-of-moments-all-v1.0          = (994623, 20)
msd-jmir-lpc-all-v1.0                      = (994623, 20)
msd-jmir-methods-of-moments-all-v1.0       = (994623, 10)
msd-jmir-mfcc-all-v1.0                     = (994623, 26)
msd-jmir-spectral-all-all-v1.0             = (994623, 16)
msd-jmir-spectral-derivatives-all-all-v1.0 = (994623, 16)
msd-marsyas-timbral-v1.0                   = (995001, 124)
msd-mvd-v1.0                               = (994188, 420)
msd-rh-v1.0                                = (994188, 60)
msd-rp-v1.0                                = (994188, 1440)
msd-ssd-v1.0                               = (994188, 168)
msd-trh-v1.0                               = (994188, 420)
msd-tssd-v1.0                              = (994188, 1176)


In [24]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()