### 1. Start Spark Session & Load Data

In [1]:
import pyspark
print(pyspark.__version__)

3.5.4


In [2]:
!which python3

/usr/bin/python3


In [3]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import avg, desc
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import pandas as pd
from operator import add
import time

spark_session = SparkSession.builder \
    .master("spark://192.168.2.130:7077") \
    .appName("Group10") \
    .config("spark.dynamicAllocation.enabled", True) \
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
    .config("spark.shuffle.service.enabled", False) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
    .config("spark.executor.cores", 2) \
    .config("spark.driver.port",9999)\
    .config("spark.blockManager.port",10005)\
    .config("spark.cores.max", "12")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/10 21:30:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# RDD API
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

In [5]:
try:
    import h5py
    from io import BytesIO
    print("h5py version:", h5py.__version__)
except ImportError as e:
    print("!!! [ERROR] Worker side has no h5py installed !!!")
    raise e

h5py version: 3.13.0


In [6]:
# # recognize to file path
# # df = spark_session.read.format("binaryFile").load("hdfs://192.168.2.130:9000/data/MillionSongSubset/A/A/A/TRAAAAW128F429D538.h5")
# df = spark_session.read.format("binaryFile").load("hdfs://192.168.2.130:9000/data/MillionSongSubset/A/N/F/TRANFRL128F931CF30.h5")
# # df.show(5, False)  # have a look at the dataset
# df.select("path").show(5, False)

### 2. Check a single dataset to get the overall information

In [7]:
# get file path
df = spark_session.read.format("binaryFile").load("hdfs://192.168.2.130:9000/data/MillionSongSubset/A/F/I/TRAFINB128F426E2F1.h5")

# print file length to check non+null
df.select("path", "length").show(truncate=False)


                                                                                

+----------------------------------------------------------------------------+------+
|path                                                                        |length|
+----------------------------------------------------------------------------+------+
|hdfs://192.168.2.130:9000/data/MillionSongSubset/A/F/I/TRAFINB128F426E2F1.h5|165602|
+----------------------------------------------------------------------------+------+



In [8]:
# get binary file
binary_data = df.select("content").collect()[0][0]  

# parsing HDF5 file
with h5py.File(BytesIO(binary_data), "r") as h5_file:
    print("HDF5 contains keys:", list(h5_file.keys()))  


HDF5 contains keys: ['analysis', 'metadata', 'musicbrainz']


In [9]:
# retrive metadata 
with h5py.File(BytesIO(binary_data), "r") as h5_file:
    if "metadata" in h5_file:
        metadata_keys = list(h5_file["metadata"].keys())  # get metadata
        print("Metadata keys:", metadata_keys)
    else:
        print("No 'metadata' found in the HDF5 file.")


Metadata keys: ['artist_terms', 'artist_terms_freq', 'artist_terms_weight', 'similar_artists', 'songs']


In [10]:
with h5py.File(BytesIO(binary_data), "r") as h5_file:
    if "metadata" in h5_file and "songs" in h5_file["metadata"]:
        songs_data = h5_file["metadata"]["songs"][:]  # get Dataset
        
        # get features
        feature_names = songs_data.dtype.names  
        
        print("Features in metadata['songs']:", feature_names)  
        
        # check first data
        first_entry = songs_data[0]
        
        # get feature and value
        for name, value in zip(feature_names, first_entry):
            print(f"{name}: {value}")

    else:
        print("No 'songs' data found in metadata.")


Features in metadata['songs']: ('analyzer_version', 'artist_7digitalid', 'artist_familiarity', 'artist_hotttnesss', 'artist_id', 'artist_latitude', 'artist_location', 'artist_longitude', 'artist_mbid', 'artist_name', 'artist_playmeid', 'genre', 'idx_artist_terms', 'idx_similar_artists', 'release', 'release_7digitalid', 'song_hotttnesss', 'song_id', 'title', 'track_7digitalid')
analyzer_version: b''
artist_7digitalid: 71484
artist_familiarity: 0.4059239066513581
artist_hotttnesss: 0.281396284444565
artist_id: b'AR15YLD1187FB3D4DD'
artist_latitude: 55.67631
artist_location: b'Copenhagen, Denmark'
artist_longitude: 12.56935
artist_mbid: b'27fc27ae-3d8d-40cd-b3cf-ed2541a2321d'
artist_name: b'John Tchicai'
artist_playmeid: 56888
genre: b''
idx_artist_terms: 0
idx_similar_artists: 0
release: b'Hymn To Sophia (Hymne Til Sofia)'
release_7digitalid: 91162
song_hotttnesss: 0.0
song_id: b'SOEULPB12A8C136DDF'
title: b'Musica Sacra Nova: Morgen I Frydenlund'
track_7digitalid: 965262


In [11]:
with h5py.File(BytesIO(binary_data), "r") as h5_file:
    if "metadata" in h5_file and "songs" in h5_file["metadata"]:
        songs_data = h5_file["metadata"]["songs"][:]

        # get title and artist_name
        song_title = songs_data[0]["title"].decode() if isinstance(songs_data[0]["title"], bytes) else str(songs_data[0]["title"])
        artist_name = songs_data[0]["artist_name"].decode() if isinstance(songs_data[0]["artist_name"], bytes) else str(songs_data[0]["artist_name"])

        print(f"Song Title: {song_title}, Artist: {artist_name}")

    else:
        print("No 'songs' data found in metadata.")

Song Title: Musica Sacra Nova: Morgen I Frydenlund, Artist: John Tchicai


### 3. Top 5 Artists by Average artist hottness

In [12]:
# record start time
total_start_time = time.time()

In [13]:
# read all file paths on HDFS
hdfs_base_path = "hdfs://192.168.2.130:9000/data/MillionSongSubset/"
df_files = spark_session.read.format("binaryFile") \
    .option("recursiveFileLookup", "true") \
    .load(hdfs_base_path) \
    .select("path", "content")

In [14]:
# limit the number of worker tasks to prevent overloading of resources
MAX_PARTITIONS = 30
df_files = df_files.repartition(MAX_PARTITIONS)

In [15]:
# parse HDF5 
def parse_hdf5(iterator):
    """ Parsing the HDF5 file on the worker side to extract artist_name and artist_hotttnesss """
    partition_start_time = time.time()  # record partition start time
    
    for row in iterator:
        file_path = row["path"]
        binary_data = row["content"] 
        try:
            with h5py.File(BytesIO(binary_data), "r") as h5_file:
                if "metadata" in h5_file and "songs" in h5_file["metadata"]:
                    songs_data = h5_file["metadata"]["songs"][:]

                    # get artist_name and artist_hotttnesss
                    artist_name = songs_data[0]["artist_name"].decode() if isinstance(songs_data[0]["artist_name"], bytes) else str(songs_data[0]["artist_name"])
                    artist_hotttnesss = float(songs_data[0]["artist_hotttnesss"]) if songs_data[0]["artist_hotttnesss"] != "nan" else None

                    if artist_hotttnesss is not None:
                        yield Row(artist_name=artist_name, artist_hotttnesss=artist_hotttnesss)
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            
    partition_end_time = time.time()
    print(f"Partition executed in {partition_end_time - partition_start_time:.2f} seconds")

In [16]:
# parallel processing of HDF5 files
rdd_parsed = df_files.rdd.mapPartitions(parse_hdf5)




In [17]:
# convert to DataFrame
schema = StructType([
    StructField("artist_name", StringType(), True),
    StructField("artist_hotttnesss", FloatType(), True)
])
df_songs = spark_session.createDataFrame(rdd_parsed, schema=schema)

In [18]:
# calculate avg artist_hotttnesss get Top 5
start_time = time.time() # record aggregation start time

df_songs.groupBy("artist_name") \
    .agg(avg("artist_hotttnesss").alias("avg_hotttnesss")) \
    .orderBy("avg_hotttnesss", ascending=False) \
    .show(5, truncate=False)



+----------------------------------+------------------+
|artist_name                       |avg_hotttnesss    |
+----------------------------------+------------------+
|Kanye West                        |1.0825026035308838|
|Kanye West / Consequence / Cam'Ron|1.0825026035308838|
|Kanye West / Lupe Fiasco          |1.0825026035308838|
|Kanye West / Adam Levine          |1.0825026035308838|
|Daft Punk                         |1.021255612373352 |
+----------------------------------+------------------+
only showing top 5 rows



                                                                                

In [19]:
end_time = time.time()  # record end time
print(f"Aggregation took {end_time - start_time:.2f} seconds")
# Record the total time of Spark task
total_end_time = time.time()
print(f"Total Spark job executed in {total_end_time - total_start_time:.2f} seconds")

Aggregation took 15.48 seconds
Total Spark job executed in 40.60 seconds


In [20]:
# stop Spark session
spark_session.stop()