In [1]:
# import packages
import h5py
import time
import sys
import glob
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

In [2]:
#Creating the Spark Session
spark = SparkSession.builder\
.master("spark://spark-master:7077") \
.appName("experiment")\
.config("spark.cores.max", 1) \
.config("spark.driver.memory",'1g') \
.getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/21 18:51:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Create Spark dataframe from h5df files:

In [3]:
# create Spark data frame 

start_time = time.perf_counter()

data_folder_path = '/opt/workspace/datascaleup/songs/*/*/*/*.h5'

#################################################
# read a dataset and return it as a Python list #

def f(path):
    with h5py.File(path,'r') as f:
        data = f['/metadata/songs']
        return [tuple(metadata if type(metadata)!= bytes else metadata.decode("utf-8")  for metadata in data[0].tolist())]
    
################################################

songs_hdf5_files_paths_rdd = sc.parallelize(list(glob.glob(data_folder_path)))

songs_metadata_rdd = songs_hdf5_files_paths_rdd.flatMap(f)
schema= ['analyzer_version', 'artist_7digitalid', 'artist_familiarity', 'artist_hotttnesss', 'artist_id', 
         'artist_latitude', 'artist_location', 'artist_longitude', 'artist_mbid', 'artist_name', 'artist_playmeid', 
         'genre', 'idx_artist_terms', 'idx_similar_artists', 'release', 'release_7digitalid', 
         'song_hotttnesss', 'song_id', 'title', 'track_7digitalid']

songs_metadata_df= songs_metadata_rdd.toDF(schema)

end_time = time.perf_counter()

print(f"Runtime: {end_time - start_time:.2f} seconds")

                                                                                

Runtime: 5.88 seconds


# Simple operation (rows count):

In [4]:
# return the number of songs

start_time = time.perf_counter()

songs_metadata_df.count()

end_time = time.perf_counter()
print(f"Runtime: {end_time - start_time:.2f} seconds")



Runtime: 19.95 seconds


                                                                                

# Complex operation (sort):

In [5]:
start_time = time.perf_counter()
unique_songs = songs_metadata_df.dropDuplicates(["title"])
songs_by_artist = unique_songs.filter(songs_metadata_df.song_hotttnesss.isNotNull()) \
    .groupBy("artist_name") \
    .agg({"song_hotttnesss": "max", "title": "collect_list"}) \
    .withColumnRenamed("max(song_hotttnesss)", "max_song_hotttnesss") \
    .withColumnRenamed("collect_list(title)", "sorted_songs") \
    .select("artist_name", "max_song_hotttnesss", "sorted_songs") \
    .orderBy(desc("max_song_hotttnesss"))

# Print the artists with all of their songs sorted by song hotness
for row in songs_by_artist.collect():
    artist_name = row["artist_name"]
    max_song_hotttnesss = row["max_song_hotttnesss"]
    sorted_songs = row["sorted_songs"]
    if max_song_hotttnesss is not None:
        print(f"{artist_name}:")
        for song in sorted_songs:
            print(f"    {song}")
            
end_time = time.perf_counter()
print(f"Runtime: {end_time - start_time:.2f} seconds")

                                                                                

(hed) p.e.:
    A Soldiers Intro
1. Futurologischer Congress:
    Die Art von Mann
    La Excursion
    Machinenfunk
    Tokyo
2 Minutos:
    Cancion De Amor
    Como Caramelo De Limón
    Rompan Todo
    Tema De Adrian
    Valentin Alsina
    Vos No Confiaste
    14 Botellas
    1987
    2 Minutos
2-4 Grooves feat. Reki D.:
    Relax
2-Gether feat. Sarinah:
    Feelin` Alone
20/20:
    I Never Did No Hitler
    It Goes On
    So Many Reasons
23rd Turnoff:
    Michaelangelo
2Fresh:
    Get It Started
2raumwohnung:
    Ja (Hanno's Poppersmonstermix)
    Lachen und weinen
    Mädchen mit Plan
    Rette Mich Später (Michael Ilbert Radio Mix)
    36 Grad (Paul van Dyk's Vandit Clubmix)
3 Doors Down:
    Be Like That
    Better Life
    Duck And Run
    Here Without You
    It's Not Me
    Sarah Yellin'
    Who Are You
3 Steps Ahead:
    My Mind Is Gone
3 of Hearts:
    Is It Love
3OH!3:
    CHOKECHAIN (Amended Album Version)
4 Skins:
    A.C.A.B. (Bumper Sessions)
    Bread Or Blood (Singl

In [6]:
spark.stop()