# Data processing

## Q1(a) 

In [1]:
# Run this cell to import pyspark and to define start_spark() and stop_spark()

import findspark

findspark.init()

import getpass
import pandas
import pyspark
import random
import re

from IPython.display import display, HTML
from pyspark import SparkContext
from pyspark.sql import SparkSession


# Functions used below
def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.

    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

   
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark
def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()
 
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()

# Make css changes to improve spark output readability
html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [2]:
# Run this cell to start a spark session in this notebook

start_spark(executor_instances=4, executor_cores=2, worker_memory=4, master_memory=4)

0,1
spark.dynamicAllocation.enabled,false
spark.executor.instances,4
spark.app.id,app-20240520181739-1384
spark.driver.port,33277
spark.app.startTime,1716185858514
spark.driver.memory,4g
spark.executor.memory,4g
spark.master,spark://masternode2:7077
spark.executor.id,driver
spark.ui.port,4082


In [3]:
# Show the data structure of msd
!hdfs dfs -ls /data/msd
!hdfs dfs -du -h /data/msd

Found 4 items
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/audio
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/genre
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:28 /data/msd/main
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/tasteprofile
12.3 G   98.1 G   /data/msd/audio
30.1 M   241.0 M  /data/msd/genre
174.4 M  1.4 G    /data/msd/main
490.4 M  3.8 G    /data/msd/tasteprofile


In [4]:
# Show the data structure of audio  
!hdfs dfs -ls /data/msd/audio
!hdfs dfs -du -h /data/msd/audio

Found 3 items
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/audio/attributes
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/audio/features
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:28 /data/msd/audio/statistics
103.0 K  824.3 K  /data/msd/audio/attributes
12.2 G   97.8 G   /data/msd/audio/features
40.3 M   322.1 M  /data/msd/audio/statistics


In [5]:
# Show the data structure of audio subdirectory 
!hdfs dfs -ls /data/msd/audio/attributes
!hdfs dfs -du -h /data/msd/audio/attributes

Found 13 items
-rwxr-xr-x   8 jsw93 supergroup       1051 2021-09-29 10:35 /data/msd/audio/attributes/msd-jmir-area-of-moments-all-v1.0.attributes.csv
-rwxr-xr-x   8 jsw93 supergroup        671 2021-09-29 10:35 /data/msd/audio/attributes/msd-jmir-lpc-all-v1.0.attributes.csv
-rwxr-xr-x   8 jsw93 supergroup        484 2021-09-29 10:35 /data/msd/audio/attributes/msd-jmir-methods-of-moments-all-v1.0.attributes.csv
-rwxr-xr-x   8 jsw93 supergroup        898 2021-09-29 10:35 /data/msd/audio/attributes/msd-jmir-mfcc-all-v1.0.attributes.csv
-rwxr-xr-x   8 jsw93 supergroup        777 2021-09-29 10:35 /data/msd/audio/attributes/msd-jmir-spectral-all-all-v1.0.attributes.csv
-rwxr-xr-x   8 jsw93 supergroup        777 2021-09-29 10:35 /data/msd/audio/attributes/msd-jmir-spectral-derivatives-all-all-v1.0.attributes.csv
-rwxr-xr-x   8 jsw93 supergroup      12317 2021-09-29 10:35 /data/msd/audio/attributes/msd-marsyas-timbral-v1.0.attributes.csv
-rwxr-xr-x   8 jsw93 supergroup       9990 2021-09-29 10

In [6]:
# Show the data structure of audio subdirectory 
!hdfs dfs -ls /data/msd/audio/features
!hdfs dfs -du -h /data/msd/audio/features

Found 13 items
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:31 /data/msd/audio/features/msd-jmir-area-of-moments-all-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:33 /data/msd/audio/features/msd-jmir-lpc-all-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/audio/features/msd-jmir-methods-of-moments-all-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/audio/features/msd-jmir-mfcc-all-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:33 /data/msd/audio/features/msd-jmir-spectral-all-all-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/audio/features/msd-jmir-spectral-derivatives-all-all-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/audio/features/msd-marsyas-timbral-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:29 /data/msd/audio/features/msd-mvd-v1.0.csv
drwxr-xr-x   - jsw93 supergroup          0 20

In [7]:
# Show the data structure of audio subdirectory 
!hdfs dfs -ls /data/msd/audio/statistics
!hdfs dfs -du -h /data/msd/audio/statistics

Found 1 items
-rwxr-xr-x   8 jsw93 supergroup   42224669 2021-09-29 10:28 /data/msd/audio/statistics/sample_properties.csv.gz
40.3 M  322.1 M  /data/msd/audio/statistics/sample_properties.csv.gz


In [8]:
# Show the data structure of genre  
!hdfs dfs -ls /data/msd/genre
!hdfs dfs -du -h /data/msd/genre

Found 3 items
-rwxr-xr-x   8 jsw93 supergroup   11625230 2021-09-29 10:35 /data/msd/genre/msd-MAGD-genreAssignment.tsv
-rwxr-xr-x   8 jsw93 supergroup    8820054 2021-09-29 10:35 /data/msd/genre/msd-MASD-styleAssignment.tsv
-rwxr-xr-x   8 jsw93 supergroup   11140605 2021-09-29 10:35 /data/msd/genre/msd-topMAGD-genreAssignment.tsv
11.1 M  88.7 M  /data/msd/genre/msd-MAGD-genreAssignment.tsv
8.4 M   67.3 M  /data/msd/genre/msd-MASD-styleAssignment.tsv
10.6 M  85.0 M  /data/msd/genre/msd-topMAGD-genreAssignment.tsv


In [9]:
# Show the data structure of main  
!hdfs dfs -ls /data/msd/main
!hdfs dfs -du -h /data/msd/main/
!hdfs dfs -ls /data/msd/main/summary
!hdfs dfs -du -h /data/msd/main/summary

Found 1 items
drwxr-xr-x   - jsw93 supergroup          0 2022-05-23 12:11 /data/msd/main/summary
174.4 M  1.4 G  /data/msd/main/summary
Found 2 items
-rwxr-xr-x   8 jsw93 supergroup   58658141 2021-09-29 10:28 /data/msd/main/summary/analysis.csv.gz
-rwxr-xr-x   8 jsw93 supergroup  124211304 2021-09-29 10:28 /data/msd/main/summary/metadata.csv.gz
55.9 M   447.5 M  /data/msd/main/summary/analysis.csv.gz
118.5 M  947.7 M  /data/msd/main/summary/metadata.csv.gz


In [10]:
# Show the data structure of tasteprofile  
!hdfs dfs -ls /data/msd/tasteprofile/
!hdfs dfs -du -h /data/msd/tasteprofile

!hdfs dfs -ls /data/msd/tasteprofile/mismatches

Found 2 items
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/tasteprofile/mismatches
drwxr-xr-x   - jsw93 supergroup          0 2021-09-29 10:35 /data/msd/tasteprofile/triplets.tsv
2.0 M    16.2 M  /data/msd/tasteprofile/mismatches
488.4 M  3.8 G   /data/msd/tasteprofile/triplets.tsv
Found 2 items
-rwxr-xr-x   8 jsw93 supergroup      91342 2021-09-29 10:35 /data/msd/tasteprofile/mismatches/sid_matches_manually_accepted.txt
-rwxr-xr-x   8 jsw93 supergroup    2026182 2021-09-29 10:35 /data/msd/tasteprofile/mismatches/sid_mismatches.txt


## Q1(b)

In [11]:
# Import modules from pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract

# Initialize the SparkSession
spark = SparkSession.builder.appName('Rows Count').getOrCreate()

In [12]:
# Count the number of row for audio-attributes
audio_attributes_df = (
    spark.read.format('com.databricks.spark.csv')
    .option('header', 'false')
    .option('inferSchema', 'true')
    .load('hdfs:///data/msd/audio/attributes/*.csv')
)

audio_attributes_df.show(10)

audio_attributes_rows = audio_attributes_df.count()
print(audio_attributes_rows)

+------------+-------+
|         _c0|    _c1|
+------------+-------+
| component_1|NUMERIC|
| component_2|NUMERIC|
| component_3|NUMERIC|
| component_4|NUMERIC|
| component_5|NUMERIC|
| component_6|NUMERIC|
| component_7|NUMERIC|
| component_8|NUMERIC|
| component_9|NUMERIC|
|component_10|NUMERIC|
+------------+-------+
only showing top 10 rows

3929


In [13]:
# Count the number of row for audio-features
audio_features_df = (
    spark.read.format('com.databricks.spark.csv')
    .option('header', 'false')
    .option('inferSchema', 'true')
    .load('hdfs:///data/msd/audio/features/*.csv')
)

audio_features_df.show(10)

audio_features_rows = audio_features_df.count()
print(audio_features_rows)

+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--

12927867


In [14]:
# Count the number of row for audio-statistics
audio_statistics_df = (
    spark.read.format('com.databricks.spark.csv')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .load('hdfs:///data/msd/audio/statistics/*.csv.gz')
)

audio_statistics_df.show(10)
                         
audio_statistics_rows = audio_statistics_df.count()
print(audio_statistics_rows)

+------------------+--------------------+--------------------+---------+----------+--------------+-------------+-----------+-----------+--------------+--------+
|          track_id|               title|         artist_name| duration|7digita_Id|sample_bitrate|sample_length|sample_rate|sample_mode|sample_version|filesize|
+------------------+--------------------+--------------------+---------+----------+--------------+-------------+-----------+-----------+--------------+--------+
|TRMMMYQ128F932D901|        Silent Night|    Faster Pussy cat|252.05506|   7032331|           128|60.1935770567|      22050|          1|             2|  960887|
|TRMMMKD128F425225D|         Tanssi vaan|    Karkkiautomaatti|156.55138|   1514808|            64|30.2244270016|      22050|          1|             2|  242038|
|TRMMMRX128F93187D9|   No One Could Ever|      Hudson Mohawke|138.97098|   6945353|           128|60.1935770567|      22050|          1|             2|  960887|
|TRMMMCH128F425532C|       Si Vos 

In [15]:
# Count the number of row for genre
genre_df = (
    spark.read.format('com.databricks.spark.csv')
    .option('header', 'false')
    .option('inferSchema', 'true')
    .option('sep', '\t')
    .load('hdfs:///data/msd/genre/*.tsv')
)

genre_df.show(10, truncate=False)

genre_rows = genre_df.count()
print(genre_rows)

+------------------+--------------+
|_c0               |_c1           |
+------------------+--------------+
|TRAAAAK128F9318786|Pop_Rock      |
|TRAAAAV128F421A322|Pop_Rock      |
|TRAAAAW128F429D538|Rap           |
|TRAAABD128F429CF47|Pop_Rock      |
|TRAAACV128F423E09E|Pop_Rock      |
|TRAAADT12903CCC339|Easy_Listening|
|TRAAAED128E0783FAB|Vocal         |
|TRAAAEF128F4273421|Pop_Rock      |
|TRAAAEM128F93347B9|Electronic    |
|TRAAAFD128F92F423A|Pop_Rock      |
+------------------+--------------+
only showing top 10 rows

1103077


In [16]:
# Count the number of row for main
main_df = (
    spark.read.format('com.databricks.spark.csv')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('sep', ',')
    .load('hdfs:///data/msd/main/summary/*.csv.gz')
)

main_df.show(10)

main_rows = main_df.count()
print(main_rows)

# Count the number of row of unique songs
unique_main_rows = (
    main_df
    .select('song_id')
    .distinct()
    .count()
)
print(unique_main_rows)

+----------------+-----------------+-------------------+-------------------+------------------+---------------+--------------------+----------------+--------------------+--------------------+---------------+-----+----------------+-------------------+--------------------+------------------+------------------+------------------+--------------------+----------------+
|analyzer_version|artist_7digitalid| artist_familiarity|  artist_hotttnesss|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_mbid|         artist_name|artist_playmeid|genre|idx_artist_terms|idx_similar_artists|             release|release_7digitalid|   song_hotttnesss|           song_id|               title|track_7digitalid|
+----------------+-----------------+-------------------+-------------------+------------------+---------------+--------------------+----------------+--------------------+--------------------+---------------+-----+----------------+-------------------+----------------

In [17]:
# Count the number of row for tasteprofile-mismatches
taste_mismatches_df = (
    spark.read.format('com.databricks.spark.csv')
    .option('header', 'false')
    .option('inferSchema', 'true')
    .load('hdfs:///data/msd/tasteprofile/mismatches/*.txt')
)
taste_mismatches_df.show(10, truncate=False)

# Extract song_id and creat a new dataframe
new_mismatches_df = taste_mismatches_df.select(
    regexp_extract('_c0', r'<(\w+) (\w+)>', 1).alias('song_id'),
    regexp_extract('_c0', r'<\w+ (\w+)>', 1).alias('track_id'),
    regexp_extract('_c0', r'> (.+?) - (.+?) !=', 1).alias('artist1_id'),
    regexp_extract('_c0', r'> (.+?) - (.+?) !=', 2).alias('title1_id'),
    regexp_extract('_c0', r'!= (.+?) - (.+?)$', 1).alias('artist2_id'),
    regexp_extract('_c0', r'!= (.+?) - (.+?)$', 2).alias('title2_id')
)
new_mismatches_df.show(10, truncate=False)

new_mismatches_rows = new_mismatches_df.count()
print(new_mismatches_rows)

# Count the number of row of unique songs
unique_new_mismatches_rows = (
    new_mismatches_df
    .select('song_id')
    .distinct()
    .count()
)
print(unique_new_mismatches_rows)

+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0                                                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|ERROR: <SOUMNSI12AB0182807 TRMMGKQ128F9325E10> Digital Underground  -  The Way We Swing  !=  Linkwood  -  Whats up with the Underground              |
|ERROR: <SOCMRBE12AB018C546 TRMMREB12903CEB1B1> Jimmy Reed  -  The Sun Is Shining (Digitally Remastered)  !=  Slim Harpo  -  I Got Love If You Want It|
|ERROR: <SOLPHZY12AC468ABA8 TRMMBOC12903CEB46E> Africa HiTech  -  Footstep  !=  Marcus Worgull  -  Drumstern (BONUS TRACK)                            |
|ERROR: <SONGHTM12A8C1374EF TRMMITP128F425D8D0> Death in Vegas  -  Anita Berber  !=  Val

In [18]:
# Count the number of row for tasteprofile-triplets
taste_triplets_df = (
    spark.read.format('com.databricks.spark.csv')
    .option('header', 'false')
    .option('inferSchema', 'true')
    .option('sep', '\t')
    .load('hdfs:///data/msd/tasteprofile/triplets.tsv')
    .toDF('user_id', 'song_id', 'play_count')
)

taste_triplets_df.show(10, truncate=False)

taste_triplets_rows = taste_triplets_df.count()
print(taste_triplets_rows)

# Count the number of row of unique songs
unique_taste_triplets_rows = (
    taste_triplets_df
    .select('song_id')
    .distinct()
    .count()
)
print(unique_taste_triplets_rows)

+----------------------------------------+------------------+----------+
|user_id                                 |song_id           |play_count|
+----------------------------------------+------------------+----------+
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOQEFDN12AB017C52B|1         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOQOIUJ12A6701DAA7|2         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOQOKKD12A6701F92E|4         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOSDVHO12AB01882C7|1         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOSKICX12A6701F932|1         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOSNUPV12A8C13939B|1         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOSVMII12A6701F92D|1         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOTUNHI12B0B80AFE2|1         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOTXLTZ12AB017C535|1         |
|f1bfc2a4597a3642f232e7a4e5d5ab2a99cf80e5|SOTZDDX12A6701F935|1         |
+----------------------------------------+---------

## Q2(a) 

In [19]:
# Load mismatch lists
mismatch_path = "hdfs:///data/msd/tasteprofile/mismatches/sid_matches_manually_accepted.txt"
manual_accepted_path = "hdfs:///data/msd/tasteprofile/mismatches/sid_mismatches.txt"
mismatch_df = spark.read.format('csv').option('header', 'false').load(mismatch_path)
manual_accepted_df = spark.read.format('csv').option('header', 'false').load(manual_accepted_path)

# Extract song_id and track_id 
mismatch_df = mismatch_df.select(
    regexp_extract('_c0', r'<(\w+)', 1).alias('song_id'),
    regexp_extract('_c0', r'(\w+)>', 1).alias('track_id'),
)
manual_accepted_df = manual_accepted_df.select(
    regexp_extract('_c0', r'<(\w+)', 1).alias('song_id'),
    regexp_extract('_c0', r'(\w+)>', 1).alias('track_id'),
)

mismatch_df.show(10, truncate=False)
manual_accepted_df.show(10, truncate=False)

# Remove automatically identified mismatches
filtered_df = taste_triplets_df.join(mismatch_df, on='song_id', how='left_anti')

# Remove manually accepted mismatches
filtered_df = filtered_df.join(manual_accepted_df, on='song_id', how='left_anti')
filtered_df.show(10, truncate=False)
filtered_rows = filtered_df.count()
print(filtered_rows)

+------------------+------------------+
|song_id           |track_id          |
+------------------+------------------+
|                  |                  |
|SOFQHZM12A8C142342|TRMWMFG128F92FFEF2|
|                  |                  |
|SODXUTF12AB018A3DA|TRMWPCD12903CCE5ED|
|                  |                  |
|SOASCRF12A8C1372E6|TRMHIPJ128F426A2E2|
|                  |                  |
|SOITDUN12A58A7AACA|TRMHXGK128F42446AB|
|                  |                  |
|SOLZXUM12AB018BE39|TRMRSOF12903CCF516|
+------------------+------------------+
only showing top 10 rows

+------------------+------------------+
|song_id           |track_id          |
+------------------+------------------+
|SOUMNSI12AB0182807|TRMMGKQ128F9325E10|
|SOCMRBE12AB018C546|TRMMREB12903CEB1B1|
|SOLPHZY12AC468ABA8|TRMMBOC12903CEB46E|
|SONGHTM12A8C1374EF|TRMMITP128F425D8D0|
|SONGXCA12A8C13E82E|TRMMAYZ128F429ECE6|
|SOMBCRC12A67ADA435|TRMMNVU128EF343EED|
|SOTDWDK12A8C13617B|TRMMNCZ128F426FF0E|
|SOEBURP12AB01

## Q2(b)

In [20]:
# Import modules from pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col

# Initialize the SparkSession
spark = SparkSession.builder.appName('Creat Schema').getOrCreate()

# Define a mapping
type_mapping = {
    'real': FloatType(),
    'NUMERIC': FloatType(),
    'INTEGER': IntegerType(),
    'STRING': StringType()
}

# Load attributes file
attributes_file = [
    'hdfs:///data/msd/audio/attributes/msd-jmir-area-of-moments-all-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-jmir-lpc-all-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-jmir-methods-of-moments-all-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-jmir-mfcc-all-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-jmir-spectral-all-all-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-jmir-spectral-derivatives-all-all-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-marsyas-timbral-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-mvd-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-rh-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-rp-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-ssd-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-trh-v1.0.attributes.csv',
    'hdfs:///data/msd/audio/attributes/msd-tssd-v1.0.attributes.csv'
]

# Initialize an empty list
schemas = []

# Iterate over each file
for attr_file in attributes_file:
    attributes_df = spark.read.format('csv').option('header', 'false').load(attr_file)
    attributes_df = attributes_df.select(
        col('_c0').alias('name'),
        col('_c1').alias('type')
    )
    attributes_df.show(5, truncate=False)
    
    # Automatically creat StructType for each file
    schema = StructType([
        StructField(row['name'], type_mapping.get(row['type'], StringType()), True)
        for row in attributes_df.collect()
    ])
    
    schemas.append(schema)
    print(attr_file)
    print(schema)

# Load features file
features_file_paths = [
    'hdfs:///data/msd/audio/features/msd-jmir-area-of-moments-all-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-jmir-lpc-all-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-jmir-methods-of-moments-all-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-jmir-mfcc-all-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-jmir-spectral-all-all-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-jmir-spectral-derivatives-all-all-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-marsyas-timbral-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-mvd-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-rh-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-rp-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-ssd-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-trh-v1.0.csv',
    'hdfs:///data/msd/audio/features/msd-tssd-v1.0.csv'
]

# Initialize an empty list
features_dfs = []

# Iterate over each file
for (features_file, schema) in zip(features_file_paths, schemas):
    print(features_file)
    audio_features_df = spark.read.schema(schema).csv(features_file)
    audio_features_df.show(5, truncate=False)
    
    features_dfs.append((features_file, audio_features_df))
    audio_features_rows = audio_features_df.count()
    print(features_file)
    print(audio_features_rows)

+---------------------------------------------------+----+
|name                                               |type|
+---------------------------------------------------+----+
|Area_Method_of_Moments_Overall_Standard_Deviation_1|real|
|Area_Method_of_Moments_Overall_Standard_Deviation_2|real|
|Area_Method_of_Moments_Overall_Standard_Deviation_3|real|
|Area_Method_of_Moments_Overall_Standard_Deviation_4|real|
|Area_Method_of_Moments_Overall_Standard_Deviation_5|real|
+---------------------------------------------------+----+
only showing top 5 rows

hdfs:///data/msd/audio/attributes/msd-jmir-area-of-moments-all-v1.0.attributes.csv
StructType(List(StructField(Area_Method_of_Moments_Overall_Standard_Deviation_1,FloatType,true),StructField(Area_Method_of_Moments_Overall_Standard_Deviation_2,FloatType,true),StructField(Area_Method_of_Moments_Overall_Standard_Deviation_3,FloatType,true),StructField(Area_Method_of_Moments_Overall_Standard_Deviation_4,FloatType,true),StructField(Area_Method_o

+---------------------------------------------------+----+
|name                                               |type|
+---------------------------------------------------+----+
|Spectral_Centroid_Overall_Standard_Deviation_1     |real|
|Spectral_Rolloff_Point_Overall_Standard_Deviation_1|real|
|Spectral_Flux_Overall_Standard_Deviation_1         |real|
|Compactness_Overall_Standard_Deviation_1           |real|
|Spectral_Variability_Overall_Standard_Deviation_1  |real|
+---------------------------------------------------+----+
only showing top 5 rows

hdfs:///data/msd/audio/attributes/msd-jmir-spectral-derivatives-all-all-v1.0.attributes.csv
StructType(List(StructField(Spectral_Centroid_Overall_Standard_Deviation_1,FloatType,true),StructField(Spectral_Rolloff_Point_Overall_Standard_Deviation_1,FloatType,true),StructField(Spectral_Flux_Overall_Standard_Deviation_1,FloatType,true),StructField(Compactness_Overall_Standard_Deviation_1,FloatType,true),StructField(Spectral_Variability_Overall_

+-----------+-------+
|name       |type   |
+-----------+-------+
|component_0|NUMERIC|
|component_1|NUMERIC|
|component_2|NUMERIC|
|component_3|NUMERIC|
|component_4|NUMERIC|
+-----------+-------+
only showing top 5 rows

hdfs:///data/msd/audio/attributes/msd-mvd-v1.0.attributes.csv
StructType(List(StructField(component_0,FloatType,true),StructField(component_1,FloatType,true),StructField(component_2,FloatType,true),StructField(component_3,FloatType,true),StructField(component_4,FloatType,true),StructField(component_5,FloatType,true),StructField(component_6,FloatType,true),StructField(component_7,FloatType,true),StructField(component_8,FloatType,true),StructField(component_9,FloatType,true),StructField(component_10,FloatType,true),StructField(component_11,FloatType,true),StructField(component_12,FloatType,true),StructField(component_13,FloatType,true),StructField(component_14,FloatType,true),StructField(component_15,FloatType,true),StructField(component_16,FloatType,true),StructField(

+-----------+-------+
|name       |type   |
+-----------+-------+
|component_0|NUMERIC|
|component_1|NUMERIC|
|component_2|NUMERIC|
|component_3|NUMERIC|
|component_4|NUMERIC|
+-----------+-------+
only showing top 5 rows

hdfs:///data/msd/audio/attributes/msd-rh-v1.0.attributes.csv
StructType(List(StructField(component_0,FloatType,true),StructField(component_1,FloatType,true),StructField(component_2,FloatType,true),StructField(component_3,FloatType,true),StructField(component_4,FloatType,true),StructField(component_5,FloatType,true),StructField(component_6,FloatType,true),StructField(component_7,FloatType,true),StructField(component_8,FloatType,true),StructField(component_9,FloatType,true),StructField(component_10,FloatType,true),StructField(component_11,FloatType,true),StructField(component_12,FloatType,true),StructField(component_13,FloatType,true),StructField(component_14,FloatType,true),StructField(component_15,FloatType,true),StructField(component_16,FloatType,true),StructField(c

+-----------+-------+
|name       |type   |
+-----------+-------+
|component_0|NUMERIC|
|component_1|NUMERIC|
|component_2|NUMERIC|
|component_3|NUMERIC|
|component_4|NUMERIC|
+-----------+-------+
only showing top 5 rows

hdfs:///data/msd/audio/attributes/msd-ssd-v1.0.attributes.csv
StructType(List(StructField(component_0,FloatType,true),StructField(component_1,FloatType,true),StructField(component_2,FloatType,true),StructField(component_3,FloatType,true),StructField(component_4,FloatType,true),StructField(component_5,FloatType,true),StructField(component_6,FloatType,true),StructField(component_7,FloatType,true),StructField(component_8,FloatType,true),StructField(component_9,FloatType,true),StructField(component_10,FloatType,true),StructField(component_11,FloatType,true),StructField(component_12,FloatType,true),StructField(component_13,FloatType,true),StructField(component_14,FloatType,true),StructField(component_15,FloatType,true),StructField(component_16,FloatType,true),StructField(

+-----------+-------+
|name       |type   |
+-----------+-------+
|component_1|NUMERIC|
|component_2|NUMERIC|
|component_3|NUMERIC|
|component_4|NUMERIC|
|component_5|NUMERIC|
+-----------+-------+
only showing top 5 rows

hdfs:///data/msd/audio/attributes/msd-tssd-v1.0.attributes.csv
StructType(List(StructField(component_1,FloatType,true),StructField(component_2,FloatType,true),StructField(component_3,FloatType,true),StructField(component_4,FloatType,true),StructField(component_5,FloatType,true),StructField(component_6,FloatType,true),StructField(component_7,FloatType,true),StructField(component_8,FloatType,true),StructField(component_9,FloatType,true),StructField(component_10,FloatType,true),StructField(component_11,FloatType,true),StructField(component_12,FloatType,true),StructField(component_13,FloatType,true),StructField(component_14,FloatType,true),StructField(component_15,FloatType,true),StructField(component_16,FloatType,true),StructField(component_17,FloatType,true),StructFiel

+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+----------------------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+----------------------------------------+-----------------------------------------+--------------------+
|Area_Method_of_Moments_Overall_Standard_Devi

hdfs:///data/msd/audio/features/msd-jmir-area-of-moments-all-v1.0.csv
994623
hdfs:///data/msd/audio/features/msd-jmir-lpc-all-v1.0.csv
+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+---------------------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+----------------------+--------------------+
|LPC_Overall_Standard_Deviation_1|LPC_Overall_Standard_Deviation_2|LPC_Overall_Standard_Deviation_3|LPC_Overall_Standard_Deviation_4|LPC_Overall_Standard_Deviation_5|LPC_Overall_Standard_Deviation_6|LPC_Overall_Standard_Deviation_7|LPC_Overall_Standard_Deviation_8|LPC_Overall_Standard_Devi

hdfs:///data/msd/audio/features/msd-jmir-methods-of-moments-all-v1.0.csv
994623
hdfs:///data/msd/audio/features/msd-jmir-mfcc-all-v1.0.csv
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+----------------------------------+----------------------------------+----------------------------------+----------------------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+--------------------+
|MFCC_Overall_Standard_Deviation_1|MFCC_Overall_Standard_Deviation_2|MFCC_Overall_Standar

hdfs:///data/msd/audio/features/msd-jmir-spectral-all-all-v1.0.csv
994623
hdfs:///data/msd/audio/features/msd-jmir-spectral-derivatives-all-all-v1.0.csv
+----------------------------------------------+---------------------------------------------------+------------------------------------------+----------------------------------------+-------------------------------------------------+---------------------------------------------+-----------------------------------------------------------+-------------------------------------------+-----------------------------------+----------------------------------------+-------------------------------+-----------------------------+--------------------------------------+----------------------------------+------------------------------------------------+--------------------------------+--------------------+
|Spectral_Centroid_Overall_Standard_Deviation_1|Spectral_Rolloff_Point_Overall_Standard_Deviation_1|Spectral_Flux_Overall_Standard_Deviation_1|Com

hdfs:///data/msd/audio/features/msd-marsyas-timbral-v1.0.csv
995001
hdfs:///data/msd/audio/features/msd-mvd-v1.0.csv
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------

hdfs:///data/msd/audio/features/msd-mvd-v1.0.csv
994188
hdfs:///data/msd/audio/features/msd-rh-v1.0.csv
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------------------+
|component_0|component_1|component_2|component_3|component_4|component_5|component_6|component_7|compon

hdfs:///data/msd/audio/features/msd-rp-v1.0.csv
994188
hdfs:///data/msd/audio/features/msd-ssd-v1.0.csv
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------

hdfs:///data/msd/audio/features/msd-ssd-v1.0.csv
994188
hdfs:///data/msd/audio/features/msd-trh-v1.0.csv
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------

hdfs:///data/msd/audio/features/msd-trh-v1.0.csv
994188
hdfs:///data/msd/audio/features/msd-tssd-v1.0.csv
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-----

hdfs:///data/msd/audio/features/msd-tssd-v1.0.csv
994188


In [21]:
# Run this cell before closing the notebook or kill your spark application by hand using the link in the Spark UI

stop_spark()