In [None]:
# import needed library
import re
from collections import defaultdict
import getpass
import pandas as pd
import random
from IPython.display import display, HTML
from matplotlib import pyplot as plt
import numpy as np
import os

# import spark library
import pyspark
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row, DataFrame, Window, functions as F

## Q1 Exploring the Dataset in High-Level

### (a)

In [6]:
# get the file structure of the whole MSD data
!hdfs dfs -ls -R /data/msd > "./Supplementary/data_structure.txt"

In [121]:
# get the content of an example attribute file
!hdfs dfs -cat /data/msd/audio/attributes/msd-tssd-v1.0.attributes.csv

"component_1",NUMERIC
"component_2",NUMERIC
"component_3",NUMERIC
"component_4",NUMERIC
"component_5",NUMERIC
"component_6",NUMERIC
"component_7",NUMERIC
"component_8",NUMERIC
"component_9",NUMERIC
"component_10",NUMERIC
"component_11",NUMERIC
"component_12",NUMERIC
"component_13",NUMERIC
"component_14",NUMERIC
"component_15",NUMERIC
"component_16",NUMERIC
"component_17",NUMERIC
"component_18",NUMERIC
"component_19",NUMERIC
"component_20",NUMERIC
"component_21",NUMERIC
"component_22",NUMERIC
"component_23",NUMERIC
"component_24",NUMERIC
"component_25",NUMERIC
"component_26",NUMERIC
"component_27",NUMERIC
"component_28",NUMERIC
"component_29",NUMERIC
"component_30",NUMERIC
"component_31",NUMERIC
"component_32",NUMERIC
"component_33",NUMERIC
"component_34",NUMERIC
"component_35",NUMERIC
"component_36",NUMERIC
"component_37",NUMERIC
"component_38",NUMERIC
"component_39",NUMERIC
"component_40",NUMERIC
"component_41",NUMERIC
"component_42",NUMERIC
"

In [1]:
# parse the structure file and get the file size of the correspond file. and save to another text file
def parse_ls_line(line):
    pattern = r'^.*?\s+(\d+)\s+\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}\s+(.+)$'
    match = re.match(pattern, line)
    if match:
        size, path = match.groups()
        return {
            'size': int(size),
            'path': path
        }
    return None

def process_ls_output(lines):
    parsed_lines = [parse_ls_line(line) for line in lines if parse_ls_line(line)]
    
    # Group part files by their parent CSV
    parent_to_parts = defaultdict(list)
    for item in parsed_lines:
        if 'part-' in item['path']:
            parent_path = '/'.join(item['path'].split('/')[:-1])
            parent_to_parts[parent_path].append(item)
    
    # Process and combine sizes, removing part files
    result = []
    for item in parsed_lines:
        if item['path'] in parent_to_parts:
            # This is a parent CSV file
            total_size = item['size'] + sum(part['size'] for part in parent_to_parts[item['path']])
            item['size'] = total_size
            result.append(item)
        elif not any(item['path'].startswith(parent) for parent in parent_to_parts):
            # This is not a part file or parent of part files
            result.append(item)
    
    return result

def write_results_to_file(results, filename):
    with open(filename, 'w') as f:
        for item in results:
            f.write(f"{item['size']} {item['path']}\n")


In [16]:
# parse the data structure
with open("../Supplementary/data_structure.txt", 'r') as structure_txt:
    lines = structure_txt.readlines()
    processed = process_ls_output(lines)
    write_results_to_file(processed, "../Supplementary/data_structure_parsed.txt")
    print("Results have been written to processed_output.txt")

Results have been written to processed_output.txt


└── data (dir)
    └── msd (dir)
        ├── audio (dir)
        │   ├── attributes (dir)
        │   │   ├── msd-jmir-area-of-moments-all-v1.0.attributes.csv (1051)
        │   │   ├── msd-jmir-lpc-all-v1.0.attributes.csv (671)
        │   │   ├── msd-jmir-methods-of-moments-all-v1.0.attributes.csv (484)
        │   │   └── ...
        │   ├── features (dir)
        │   │   ├── msd-jmir-area-of-moments-all-v1.0.csv (68704038)
        │   │   ├── msd-jmir-lpc-all-v1.0.csv (55656231)
        │   │   └── ...
        │   └── statistics (dir)
        │       └── sample_properties.csv.gz (42224669)
        ├── genre (dir)
        │   ├── msd-MAGD-genreAssignment.tsv (11625230)
        │   ├── msd-MASD-styleAssignment.tsv (8820054)
        │   └── msd-topMAGD-genreAssignment.tsv (11140605)
        ├── main (dir)
        │   └── summary (dir)
        │       ├── analysis.csv.gz (58658141)
        │       └── metadata.csv.gz (124211304)
        └── tasteprofile (dir)
            ├── mismatches (dir)
            │   ├── sid_matches_manually_accepted.txt (91342)
            │   └── sid_mismatches.txt (2026182)
            └── triplets.tsv (512139195)

In [1]:
# Define some helper functions to make working with Spark easier
def username():
    """Get username with any domain information removed.
    """

    return re.sub('@.*', '', getpass.getuser())


def dict_to_html(d):
    """Convert a Python dictionary into a two column table for display.
    """

    html = []

    html.append(f'<table width="100%" style="width:100%; font-family: monospace;">')
    for k, v in d.items():
        html.append(f'<tr><td style="text-align:left;">{k}</td><td>{v}</td></tr>')
    html.append(f'</table>')

    return ''.join(html)


def show_as_html(df, n=20):
    """Leverage existing pandas jupyter integration to show a spark dataframe as html.
    
    Args:
        n (int): number of rows to show (default: 20)
    """

    display(df.limit(n).toPandas())

    
def display_spark():
    """Display the status of the active Spark session if one is currently running.
    """
    
    if 'spark' in globals() and 'sc' in globals():

        name = sc.getConf().get("spark.app.name")
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:green">active</span></b>, look for <code>{name}</code> under the running applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'<li><a href="{sc.uiWebUrl}" target="_blank">Spark Application UI</a></li>',
            f'</ul>',
            f'<p><b>Config</b></p>',
            dict_to_html(dict(sc.getConf().getAll())),
            f'<p><b>Notes</b></p>',
            f'<ul>',
            f'<li>The spark session <code>spark</code> and spark context <code>sc</code> global variables have been defined by <code>start_spark()</code>.</li>',
            f'<li>Please run <code>stop_spark()</code> before closing the notebook or restarting the kernel or kill <code>{name}</code> by hand using the link in the Spark UI.</li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))
        
    else:
        
        html = [
            f'<p><b>Spark</b></p>',
            f'<p>The spark session is <b><span style="color:red">stopped</span></b>, confirm that <code>{username() + " (jupyter)"}</code> is under the completed applications section in the Spark UI.</p>',
            f'<ul>',
            f'<li><a href="http://mathmadslinux2p.canterbury.ac.nz:8080/" target="_blank">Spark UI</a></li>',
            f'</ul>',
        ]
        display(HTML(''.join(html)))


# Functions to start and stop spark

def start_spark(executor_instances=2, executor_cores=1, worker_memory=1, master_memory=1):
    """Start a new Spark session and define globals for SparkSession (spark) and SparkContext (sc).
    
    Args:
        executor_instances (int): number of executors (default: 2)
        executor_cores (int): number of cores per executor (default: 1)
        worker_memory (float): worker memory (default: 1)
        master_memory (float): master memory (default: 1)
    """

    global spark
    global sc

    user = username()
    
    cores = executor_instances * executor_cores
    partitions = cores * 4
    port = 4000 + random.randint(1, 999)

    spark = (
        SparkSession.builder
        .master("spark://masternode2:7077")
        .config("spark.driver.extraJavaOptions", f"-Dderby.system.home=/tmp/{user}/spark/")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.executor.instances", str(executor_instances))
        .config("spark.executor.cores", str(executor_cores))
        .config("spark.cores.max", str(cores))
        .config("spark.executor.memory", f"{worker_memory}g")
        .config("spark.driver.memory", f"{master_memory}g")
        .config("spark.driver.maxResultSize", "0")
        .config("spark.sql.shuffle.partitions", str(partitions))
        .config("spark.ui.port", str(port))
        .appName(user + " (jupyter)")
        .getOrCreate()
    )
    sc = SparkContext.getOrCreate()
    
    display_spark()

    
def stop_spark():
    """Stop the active Spark session and delete globals for SparkSession (spark) and SparkContext (sc).
    """

    global spark
    global sc

    if 'spark' in globals() and 'sc' in globals():

        spark.stop()

        del spark
        del sc

    display_spark()


# Make css changes to improve spark output readability

html = [
    '<style>',
    'pre { white-space: pre !important; }',
    'table.dataframe td { white-space: nowrap !important; }',
    'table.dataframe thead th:first-child, table.dataframe tbody th { display: none; }',
    '</style>',
]
display(HTML(''.join(html)))

In [3]:
start_spark(executor_instances=2, executor_cores=2, worker_memory=2, master_memory=2)

0,1
spark.dynamicAllocation.enabled,false
spark.executor.memory,2g
spark.ui.port,4491
spark.app.startTime,1727332678651
spark.master,spark://masternode2:7077
spark.app.id,app-20240926183759-0050
spark.executor.id,driver
spark.executor.cores,2
spark.driver.port,43987
spark.driver.host,mathmadslinux2p.canterbury.ac.nz


### (b, c)

In [7]:
# Define function to obtain row count of a file
def obtain_row_count(file_path):
    try:
        # Determine file extension
        _, extension = os.path.splitext(file_path)

        # Set options based on file type
        options = {"header": "false", "inferSchema": "true"}
        
        if extension.strip() in ['.csv', '.gz']:
            options["delimiter"] = ","
        elif extension.strip() == '.tsv':
            options["delimiter"] = "\t"
        elif extension.strip() == '.txt':
            # For txt files, we'll use Spark's text file reader
            df = spark.read.text(file_path)
            return df.count()
        else:
            raise ValueError(f"Unsupported file format: {extension}")
        # Read the file
        df = spark.read.options(**options).csv(file_path.strip(), samplingRatio=0.01)
        show_as_html(df)
        
        # Count rows
        row_count = df.count()
        
        return row_count

    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")
        return None

In [94]:
# Input and output file paths
input_file = "../Supplementary/data_structure_parsed.txt"
output_file = "../Supplementary/file_info.txt"

# Process files and write information
with open(input_file, 'r') as structure_txt, open(output_file, 'w') as output_txt:
    lines = structure_txt.readlines()
    for line in lines:
        parts = line.strip().split(' ', 1)
        if len(parts) == 2 and parts[0] != '0':
            file_size = parts[0]
            file_path = parts[1].strip()
            file_name = os.path.basename(file_path)
            
            try:
                row_count = obtain_row_count(file_path)
            except Exception as e:
                print(f"Error processing {file_path}: {str(e)}")
                row_count = "N/A"
                
            # Write information to output file
            output_line = f"{file_size} {file_path} {row_count}\n"
            output_txt.write(output_line)
            print(f"Processed: {file_path}")
            

print(f"File information has been saved to {output_file}")

Processed: /data/msd/audio/attributes/msd-jmir-area-of-moments-all-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-jmir-lpc-all-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-jmir-methods-of-moments-all-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-jmir-mfcc-all-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-jmir-spectral-all-all-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-jmir-spectral-derivatives-all-all-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-marsyas-timbral-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-mvd-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-rh-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-rp-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-ssd-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-trh-v1.0.attributes.csv
Processed: /data/msd/audio/attributes/msd-tssd-v1.0.attributes.csv
Processed: /data/ms

## Q2 Exploring the Audio Dataset

In [4]:
# Define type mapping for schema
type_mapping = {
    "string": StringType(),
    "real": FloatType(),
    "numeric": DoubleType(),
}

In [5]:
# define a function to generate schema based on the perfix of the feature name
def generate_schema(dataset_prefix):
    attributes_dir = '/data/msd/audio/attributes/'
    
    schema_fields = []
    
    # Construct the full HDFS path
    attribute_file = f"{dataset_prefix}.attributes.csv"
    file_path = f"{attributes_dir}{attribute_file}"
    
    # Use Spark to read the file from HDFS
    try:
        attribute_rdd = spark.sparkContext.textFile(file_path)
        
        # Process each line in the RDD
        for line in attribute_rdd.collect():
            parts = line.strip().split(',')
            
            if len(parts) == 1:
                # If there's only one part, it might be just the attribute name
                name = parts[0]
                attr_type = "string"  # Default to string if type is not specified
            elif len(parts) >= 2:
                name = parts[0]
                attr_type = parts[1].lower()
            else:
                print(f"Warning: Unexpected line format: {line}")
                continue
            
            if attr_type in type_mapping:
                field = StructField(name, type_mapping[attr_type], True)
                schema_fields.append(field)
            else:
                print(f"Warning: Unknown type '{attr_type}' for attribute '{name}'. Using StringType.")
                field = StructField(name, StringType(), True)
                schema_fields.append(field)
        
        return StructType(schema_fields)
    
    except Exception as e:
        print(f"Error reading attribute file: {file_path}")
        print(f"Error details: {str(e)}")
        return None

In [6]:
# Define test function to print the schema
def print_schema(schema: StructType):
    if not schema:
        print("Schema is None or empty")
        return
    
    print("Schema:")
    for field in schema.fields:
        print(f"- {field.name}: {field.dataType}")

In [7]:
# iterate through all the feature files and check the schema for each of them
input_file = "../Supplementary/data_structure_parsed.txt"
feature_prefix = []
with open(input_file, 'r') as data_structure:
    lines = data_structure.readlines()
    for line in lines:
        path = line.split(' ')[1]
        if path.startswith('/data/msd/audio/features/'):
            feature_prefix.append(os.path.splitext(os.path.basename(path).strip())[0])
print(feature_prefix)
for prefix in feature_prefix:
    struct_type = generate_schema(prefix)
    print_schema(struct_type)

['msd-jmir-area-of-moments-all-v1.0', 'msd-jmir-lpc-all-v1.0', 'msd-jmir-methods-of-moments-all-v1.0', 'msd-jmir-mfcc-all-v1.0', 'msd-jmir-spectral-all-all-v1.0', 'msd-jmir-spectral-derivatives-all-all-v1.0', 'msd-marsyas-timbral-v1.0', 'msd-mvd-v1.0', 'msd-rh-v1.0', 'msd-rp-v1.0', 'msd-ssd-v1.0', 'msd-trh-v1.0', 'msd-tssd-v1.0']
Schema:
- Area_Method_of_Moments_Overall_Standard_Deviation_1: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_2: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_3: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_4: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_5: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_6: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_7: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_8: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_9: FloatType
- Area_Method_of_Moments_Overall_Standard_Deviation_10: FloatType
- Area_Me

Schema:
- "component_0": DoubleType
- "component_1": DoubleType
- "component_2": DoubleType
- "component_3": DoubleType
- "component_4": DoubleType
- "component_5": DoubleType
- "component_6": DoubleType
- "component_7": DoubleType
- "component_8": DoubleType
- "component_9": DoubleType
- "component_10": DoubleType
- "component_11": DoubleType
- "component_12": DoubleType
- "component_13": DoubleType
- "component_14": DoubleType
- "component_15": DoubleType
- "component_16": DoubleType
- "component_17": DoubleType
- "component_18": DoubleType
- "component_19": DoubleType
- "component_20": DoubleType
- "component_21": DoubleType
- "component_22": DoubleType
- "component_23": DoubleType
- "component_24": DoubleType
- "component_25": DoubleType
- "component_26": DoubleType
- "component_27": DoubleType
- "component_28": DoubleType
- "component_29": DoubleType
- "component_30": DoubleType
- "component_31": DoubleType
- "component_32": DoubleType
- "component_33": DoubleType
- "component_34"

Schema:
- "component_1": DoubleType
- "component_2": DoubleType
- "component_3": DoubleType
- "component_4": DoubleType
- "component_5": DoubleType
- "component_6": DoubleType
- "component_7": DoubleType
- "component_8": DoubleType
- "component_9": DoubleType
- "component_10": DoubleType
- "component_11": DoubleType
- "component_12": DoubleType
- "component_13": DoubleType
- "component_14": DoubleType
- "component_15": DoubleType
- "component_16": DoubleType
- "component_17": DoubleType
- "component_18": DoubleType
- "component_19": DoubleType
- "component_20": DoubleType
- "component_21": DoubleType
- "component_22": DoubleType
- "component_23": DoubleType
- "component_24": DoubleType
- "component_25": DoubleType
- "component_26": DoubleType
- "component_27": DoubleType
- "component_28": DoubleType
- "component_29": DoubleType
- "component_30": DoubleType
- "component_31": DoubleType
- "component_32": DoubleType
- "component_33": DoubleType
- "component_34": DoubleType
- "component_35

Schema:
- "component_0": DoubleType
- "component_1": DoubleType
- "component_2": DoubleType
- "component_3": DoubleType
- "component_4": DoubleType
- "component_5": DoubleType
- "component_6": DoubleType
- "component_7": DoubleType
- "component_8": DoubleType
- "component_9": DoubleType
- "component_10": DoubleType
- "component_11": DoubleType
- "component_12": DoubleType
- "component_13": DoubleType
- "component_14": DoubleType
- "component_15": DoubleType
- "component_16": DoubleType
- "component_17": DoubleType
- "component_18": DoubleType
- "component_19": DoubleType
- "component_20": DoubleType
- "component_21": DoubleType
- "component_22": DoubleType
- "component_23": DoubleType
- "component_24": DoubleType
- "component_25": DoubleType
- "component_26": DoubleType
- "component_27": DoubleType
- "component_28": DoubleType
- "component_29": DoubleType
- "component_30": DoubleType
- "component_31": DoubleType
- "component_32": DoubleType
- "component_33": DoubleType
- "component_34"

- "component_698": DoubleType
- "component_699": DoubleType
- "component_700": DoubleType
- "component_701": DoubleType
- "component_702": DoubleType
- "component_703": DoubleType
- "component_704": DoubleType
- "component_705": DoubleType
- "component_706": DoubleType
- "component_707": DoubleType
- "component_708": DoubleType
- "component_709": DoubleType
- "component_710": DoubleType
- "component_711": DoubleType
- "component_712": DoubleType
- "component_713": DoubleType
- "component_714": DoubleType
- "component_715": DoubleType
- "component_716": DoubleType
- "component_717": DoubleType
- "component_718": DoubleType
- "component_719": DoubleType
- "component_720": DoubleType
- "component_721": DoubleType
- "component_722": DoubleType
- "component_723": DoubleType
- "component_724": DoubleType
- "component_725": DoubleType
- "component_726": DoubleType
- "component_727": DoubleType
- "component_728": DoubleType
- "component_729": DoubleType
- "component_730": DoubleType
- "compone

In [1]:
stop_spark()

NameError: name 'stop_spark' is not defined