In [1]:
### Notebook Initialization – Objective & Imports

# Objective
# - Process the 10,000-song subset using PySpark.
# - Aggregate time-series features (mean, max, min, std).
# - Store the processed data in Parquet format.
# - (Optional) Push data to MongoDB Atlas.

# Import Libraries
import os
import glob
import h5py
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, min, max
from pymongo import MongoClient

print("Libraries imported successfully")

Libraries imported successfully


In [2]:
from pyspark import SparkConf

# Stop existing Spark session if running
try:
    spark.stop()
except:
    pass

# Reconfigure Spark for CPU usage
conf = SparkConf()
conf.set("spark.executor.memory", "24g")  # Maximize memory usage for large data
conf.set("spark.driver.memory", "24g")
conf.set("spark.executor.cores", "4")  # Utilize all 6 CPU cores
conf.set("spark.task.cpus", "1")       # 1 CPU core per task

# Reinitialize Spark session
spark = SparkSession.builder.config(conf=conf).appName("MSD_DataPrep_CPU").getOrCreate()

print("Spark session reconfigured for CPU processing.")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/12 11:43:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session reconfigured for CPU processing.


In [3]:
### Path Definitions

INPUT_PATH = "/kaggle/input/millionsongsubset/MillionSongSubset/*/*/*/*.h5"

# MongoDB Connection String (Optional)
MONGO_URI = "mongodb+srv://admin:yourpassword123@bigdatahw.udemiib.mongodb.net/?retryWrites=true&w=majority&appName=BigDataHW"

print("Paths defined successfully")

Paths defined successfully


In [4]:
### MongoDB Connection (Optional)

# Test MongoDB Connection
try:
    client = MongoClient(MONGO_URI)
    db = client["msd_database"]
    collection = db["test_collection"]

    # Insert sample document
    sample_doc = {"status": "Connection successful"}
    collection.insert_one(sample_doc)

    print("MongoDB connection and sample insertion successful")

except Exception as e:
    print(f"MongoDB connection failed: {e}")

MongoDB connection and sample insertion successful


In [5]:
### Data Loading and Exploration - Inspecting HDF5 Structure

# Select a sample file to inspect
sample_file = glob.glob(INPUT_PATH)[0]

# Open the file and display its structure
with h5py.File(sample_file, 'r') as hdf:
    def print_structure(name, obj):
        print(f"{name}: {type(obj)}")
    
    hdf.visititems(print_structure)

print("Sample HDF5 file structure displayed successfully")

analysis: <class 'h5py._hl.group.Group'>
analysis/bars_confidence: <class 'h5py._hl.dataset.Dataset'>
analysis/bars_start: <class 'h5py._hl.dataset.Dataset'>
analysis/beats_confidence: <class 'h5py._hl.dataset.Dataset'>
analysis/beats_start: <class 'h5py._hl.dataset.Dataset'>
analysis/sections_confidence: <class 'h5py._hl.dataset.Dataset'>
analysis/sections_start: <class 'h5py._hl.dataset.Dataset'>
analysis/segments_confidence: <class 'h5py._hl.dataset.Dataset'>
analysis/segments_loudness_max: <class 'h5py._hl.dataset.Dataset'>
analysis/segments_loudness_max_time: <class 'h5py._hl.dataset.Dataset'>
analysis/segments_loudness_start: <class 'h5py._hl.dataset.Dataset'>
analysis/segments_pitches: <class 'h5py._hl.dataset.Dataset'>
analysis/segments_start: <class 'h5py._hl.dataset.Dataset'>
analysis/segments_timbre: <class 'h5py._hl.dataset.Dataset'>
analysis/songs: <class 'h5py._hl.dataset.Dataset'>
analysis/tatums_confidence: <class 'h5py._hl.dataset.Dataset'>
analysis/tatums_start: <clas

In [6]:
from pyspark.sql import Row
import numpy as np

import unicodedata
import re

def sanitize_artist_name(artist_name):
    """
    Sanitize artist name:
    - Apply Unicode normalization (NFKD) to remove accents.
    - Convert to lowercase.
    - Remove special characters.
    - Truncate to 30 characters.
    """
    # Normalize to remove accents
    sanitized_name = unicodedata.normalize('NFKD', artist_name).encode('ASCII', 'ignore').decode('utf-8')
    
    # Convert to lowercase
    sanitized_name = sanitized_name.lower()
    
    # Remove special characters and limit length to 30 characters
    sanitized_name = re.sub(r'[^\w\s]', '', sanitized_name).strip()[:30]
    
    # Replace spaces with underscores
    sanitized_name = sanitized_name.replace(" ", "_")

    # Handle empty or missing artist names
    if not sanitized_name:
        sanitized_name = "unknown_artist"

    return sanitized_name

# Enhanced Extraction Function with Sanitization
def extract_data_with_normalization(file_path):
    try:
        with h5py.File(file_path, 'r') as hdf:
            # Extract metadata
            try:
                song_id = hdf['metadata/songs']['song_id'][0].decode('utf-8')
            except:
                return None

            try:
                artist_name = hdf['metadata/songs']['artist_name'][0].decode('utf-8')
                artist_name = sanitize_artist_name(artist_name)
            except:
                artist_name = "unknown_artist"

            try:
                title = hdf['metadata/songs']['title'][0].decode('utf-8')
            except:
                title = "unknown_title"

            # Extract and aggregate time-series data
            try:
                timbre = hdf['analysis/segments_timbre'][:]
                pitch = hdf['analysis/segments_pitches'][:]

                # Skip if data is missing
                if timbre.size == 0 or pitch.size == 0:
                    return None

                # Compute statistics
                t_mean = np.mean(timbre, axis=0).tolist()
                t_max = np.max(timbre, axis=0).tolist()
                t_min = np.min(timbre, axis=0).tolist()
                t_std = np.std(timbre, axis=0).tolist()

                p_mean = np.mean(pitch, axis=0).tolist()
                p_max = np.max(pitch, axis=0).tolist()
                p_min = np.min(pitch, axis=0).tolist()
                p_std = np.std(pitch, axis=0).tolist()

                return Row(
                    song_id=song_id,
                    artist_name=artist_name,
                    title=title,
                    timbre_mean=t_mean,
                    timbre_max=t_max,
                    timbre_min=t_min,
                    timbre_std=t_std,
                    pitch_mean=p_mean,
                    pitch_max=p_max,
                    pitch_min=p_min,
                    pitch_std=p_std
                )

            except Exception as e:
                print(f"Data extraction error: {e}")
                return None

    except Exception as e:
        print(f"File processing error: {e}")
        return None


print("Extraction function updated to include artist name sanitization.")


Extraction function updated to include artist name sanitization.


In [7]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType

# Define Schema Without 'year'
schema_by_artist = StructType([
    StructField("song_id", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("title", StringType(), True),
    StructField("timbre_mean", ArrayType(FloatType()), True),
    StructField("timbre_max", ArrayType(FloatType()), True),
    StructField("timbre_min", ArrayType(FloatType()), True),
    StructField("timbre_std", ArrayType(FloatType()), True),
    StructField("pitch_mean", ArrayType(FloatType()), True),
    StructField("pitch_max", ArrayType(FloatType()), True),
    StructField("pitch_min", ArrayType(FloatType()), True),
    StructField("pitch_std", ArrayType(FloatType()), True)
])

In [8]:
# Recreate DataFrame using the updated extraction function
files = glob.glob(INPUT_PATH, recursive=True)

data_rdd = spark.sparkContext.parallelize(files, numSlices=6) \
             .map(extract_data_with_normalization) \
             .filter(lambda x: x is not None)

# Create DataFrame
spark_df = spark.createDataFrame(data_rdd, schema_by_artist)

# Verify schema and preview data
spark_df.printSchema()
spark_df.show(5, truncate=False)

root
 |-- song_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- timbre_mean: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- timbre_max: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- timbre_min: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- timbre_std: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- pitch_mean: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- pitch_max: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- pitch_min: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- pitch_std: array (nullable = true)
 |    |-- element: float (containsNull = true)



                                                                                                    

+------------------+--------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------+------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------+
|song_id           |ar

In [9]:
import shutil
import os

# Define the output path for partitioned Parquet data
PARTITIONED_OUTPUT_PATH = "/kaggle/working/msd_agg_parquet_by_artist"

# Remove the directory if it already exists
if os.path.exists(PARTITIONED_OUTPUT_PATH):
    shutil.rmtree(PARTITIONED_OUTPUT_PATH)

# Export the DataFrame partitioned by 'artist_name'
spark_df.write.mode("overwrite").partitionBy("artist_name").parquet(PARTITIONED_OUTPUT_PATH)

print(f"Data successfully exported to {PARTITIONED_OUTPUT_PATH}, partitioned by sanitized 'artist_name'.")

                                                                                                    

Data successfully exported to /kaggle/working/msd_agg_parquet_by_artist, partitioned by sanitized 'artist_name'.


In [10]:
# Define CSV output path
CSV_OUTPUT_PATH = "/kaggle/working/msd_sample.csv"

# Extract a sample of 1000 rows
sample_df = spark_df.limit(1000)

# Export the sample to CSV
sample_df.toPandas().to_csv(CSV_OUTPUT_PATH, index=False)

print(f"Sample data exported to {CSV_OUTPUT_PATH}")

                                                                                                    

Sample data exported to /kaggle/working/msd_sample.csv


In [11]:
# Read and display the first few rows
sample_data = pd.read_csv(CSV_OUTPUT_PATH)
print(sample_data.head())

              song_id     artist_name                title  \
0  SOTEDAD12A8AE4735F         orbital              Forever   
1  SOUWZWV12AB0181ACF      kris_gruen          Prayer Walk   
2  SOFAJOM12A8C141EE4         4_skins              New War   
3  SOCCBOH12A8C13E947          rockit  Some Kind of Record   
4  SOBBWJS12AB0182522  betika__daouda   C'est pas ma faute   

                                         timbre_mean  \
0  [39.321571350097656, 6.516843795776367, 23.708...   
1  [43.89027786254883, -89.25192260742188, -22.63...   
2  [46.81022644042969, 41.30602264404297, 40.8432...   
3  [46.884056091308594, 31.062284469604492, 12.59...   
4  [44.009483337402344, -0.6849293112754822, -35....   

                                          timbre_max  \
0  [48.8380012512207, 191.81300354003906, 189.507...   
1  [50.505001068115234, 171.1300048828125, 139.67...   
2  [52.86899948120117, 171.1300048828125, 127.591...   
3  [54.18000030517578, 129.39599609375, 105.94899...   
4  [52.166

In [12]:
# Define schema log path
SCHEMA_LOG_PATH = "/kaggle/working/schema_log.txt"

# Log the schema to a text file
with open(SCHEMA_LOG_PATH, "w") as file:
    file.write(str(spark_df.schema))

print(f"Schema logged successfully at {SCHEMA_LOG_PATH}")

Schema logged successfully at /kaggle/working/schema_log.txt


In [13]:
# Display the first few lines of the schema log
with open(SCHEMA_LOG_PATH, "r") as file:
    print(file.read())

StructType([StructField('song_id', StringType(), True), StructField('artist_name', StringType(), True), StructField('title', StringType(), True), StructField('timbre_mean', ArrayType(FloatType(), True), True), StructField('timbre_max', ArrayType(FloatType(), True), True), StructField('timbre_min', ArrayType(FloatType(), True), True), StructField('timbre_std', ArrayType(FloatType(), True), True), StructField('pitch_mean', ArrayType(FloatType(), True), True), StructField('pitch_max', ArrayType(FloatType(), True), True), StructField('pitch_min', ArrayType(FloatType(), True), True), StructField('pitch_std', ArrayType(FloatType(), True), True)])


In [14]:
import os

def get_dir_size(path):
    """Calculate the total size of the directory in MB."""
    total_size = 0
    for dirpath, _, filenames in os.walk(path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size / (1024 * 1024)  # Convert to MB

# Calculate Parquet and CSV sizes
parquet_size = get_dir_size(PARTITIONED_OUTPUT_PATH)
csv_size = os.path.getsize(CSV_OUTPUT_PATH) / (1024 * 1024)  # Convert to MB

print(f"Parquet Size: {parquet_size:.2f} MB")
print(f"CSV Sample Size: {csv_size:.2f} MB")

Parquet Size: 31.85 MB
CSV Sample Size: 1.75 MB


In [16]:
import shutil

# Define ZIP output path
ZIP_OUTPUT_PATH = "/kaggle/working/msd_parquet_by_artist.zip"

# Path to Parquet directory
PARQUET_DIR = "/kaggle/working/msd_agg_parquet_by_artist"

# Create ZIP
shutil.make_archive(ZIP_OUTPUT_PATH.replace(".zip", ""), 'zip', PARQUET_DIR)

print(f"Zipping completed. ZIP file created at: {ZIP_OUTPUT_PATH}")

Zipping completed. ZIP file created at: /kaggle/working/msd_parquet_by_artist.zip


In [17]:
# Drop existing collection
try:
    client = MongoClient(MONGO_URI)
    db = client["msd_database"]
    db["song_data"].drop()
    print("Existing MongoDB collection 'song_data' dropped successfully.")
except Exception as e:
    print(f"Error dropping MongoDB collection: {e}")

Existing MongoDB collection 'song_data' dropped successfully.


In [18]:
# MongoDB Ingestion - Using Defined Path and Connection

from pymongo import MongoClient

# Establish MongoDB Connection
try:
    client = MongoClient(MONGO_URI)
    db = client["msd_database"]
    collection = db["song_data"]
    print("MongoDB connection established successfully.")
except Exception as e:
    print(f"MongoDB connection failed: {e}")

# Function to Convert DataFrame Row to Dictionary
# Function to convert DataFrame rows to MongoDB-compatible dictionaries
def row_to_dict(row):
    return {
        "song_id": row.song_id,
        "artist_name": row.artist_name,
        "title": row.title,
        "timbre_mean": row.timbre_mean,
        "timbre_max": row.timbre_max,
        "timbre_min": row.timbre_min,
        "timbre_std": row.timbre_std,
        "pitch_mean": row.pitch_mean,
        "pitch_max": row.pitch_max,
        "pitch_min": row.pitch_min,
        "pitch_std": row.pitch_std
    }

# Re-ingest data to MongoDB
BATCH_SIZE = 1000
data = spark_df.rdd.map(row_to_dict).collect()

try:
    for i in range(0, len(data), BATCH_SIZE):
        batch = data[i:i + BATCH_SIZE]
        collection.insert_many(batch, ordered=False)
        print(f"Inserted batch {i // BATCH_SIZE + 1} of {len(data) // BATCH_SIZE + 1}")
except Exception as e:
    print(f"Error during MongoDB ingestion: {e}")

print("Data re-ingested to MongoDB successfully.")


MongoDB connection established successfully.


                                                                                                    

Inserted batch 1 of 11
Inserted batch 2 of 11
Inserted batch 3 of 11
Inserted batch 4 of 11
Inserted batch 5 of 11
Inserted batch 6 of 11
Inserted batch 7 of 11
Inserted batch 8 of 11
Inserted batch 9 of 11
Inserted batch 10 of 11
Data re-ingested to MongoDB successfully.


In [19]:
# Ensure MongoDB connection is still active
try:
    # Create Unique Index on song_id
    collection.create_index("song_id", unique=True)
    print("Index created on 'song_id' (Unique).")

    # Create Index on artist_name
    collection.create_index("artist_name")
    print("Index created on 'artist_name'.")

    # Create Index on title
    collection.create_index("title")
    print("Index created on 'title'.")

except Exception as e:
    print(f"Index creation failed: {e}")

Index created on 'song_id' (Unique).
Index created on 'artist_name'.
Index created on 'title'.
