In [27]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, FloatType
from pyspark.sql.functions import col

import h5py
import numpy as np
import pandas as pd
import glob

In [29]:
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.130:7077") \
        .appName("de16_sparky_ludde57")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .config("spark.hadoop.fs.defaultFS", "hdfs://192.168.2.130:9000")\
        .getOrCreate()

sc = spark_session.sparkContext

ConnectionRefusedError: [Errno 111] Connection refused

In [10]:
columns = StructType([
        StructField('danceability', DoubleType(), nullable=True),
        StructField('song_hotttnesss', DoubleType(), nullable=True),
        StructField('energy', DoubleType(), nullable=True),
        StructField('duration', DoubleType(), nullable=True),
        StructField('key', IntegerType(), nullable=True),
        StructField('loudness', DoubleType(), nullable=True),
        StructField('tempo', DoubleType(), nullable=True),
        StructField('time_signature', DoubleType(), nullable=True),
        StructField('year', IntegerType(), nullable=True)
    ])

def init_df():
    df = spark_session.createDataFrame([], columns)
    return df

In [11]:
import io

def add_song(df, path):
    binary = spark_session.read.format("binaryFile").load(path)
    content = io.BytesIO(binary.first()['content'])
    with h5py.File(content, 'r') as file:
        new_song_values = Row(
            danceability = float(file['analysis']['songs'][0][2]),  # DoubleType
            song_hotttnesss = float(file['metadata']['songs'][0][16]),  # DoubleType
            energy = float(file['analysis']['songs'][0][5]),  # DoubleType
            duration = float(file['analysis']['songs'][0][3]),  # DoubleType
            key = int(file['analysis']['songs'][0][21]),  # IntegerType
            loudness = float(file['analysis']['songs'][0][23]),  # DoubleType
            tempo = float(file['analysis']['songs'][0][27]),  # DoubleType
            time_signature = float(file['analysis']['songs'][0][28]),  # DoubleType
            year = int(file['musicbrainz']['songs'][0][1])  # IntegerType
        )
        new_song = spark_session.createDataFrame([new_song_values], columns)
    return df.union(new_song)

In [12]:
df = init_df()

def list_h5_files(path):
    hadoop_conf = sc._jsc.hadoopConfiguration()
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
    files = []
    status = fs.listStatus(sc._jvm.org.apache.hadoop.fs.Path(path))
    for file_status in status:
        file_path = file_status.getPath()
        if file_status.isDirectory():
            files.extend(list_h5_files(file_path.toString()))
        elif file_path.toString().endswith(".h5"):
            files.append(file_path.toString())
    return files

base_directory = "hdfs://192.168.2.130:9000/user/MillionSongSubset"
song_paths = list_h5_files(base_directory)


for i in range(100):  # Max 10000
    if i % 10 == 0:
        print(i)
    df = add_song(df, song_paths[i])

path = "hdfs://192.168.2.130:9000/user/MillionSongSubset/A/A/A/TRAAAAW128F429D538.h5"

df.show()
df.printSchema()
print(df.count())
print(df.rdd.getNumPartitions())

0
10
20
30
40
50
60
70
80
90


                                                                                

+------------+-------------------+------+---------+---+--------+-------+--------------+----+
|danceability|    song_hotttnesss|energy| duration|key|loudness|  tempo|time_signature|year|
+------------+-------------------+------+---------+---+--------+-------+--------------+----+
|         0.0| 0.6021199899057548|   0.0|218.93179|  1| -11.197| 92.198|           4.0|   0|
|         0.0|                NaN|   0.0|148.03546|  6|  -9.843|121.274|           4.0|1969|
|         0.0|                NaN|   0.0|177.47546|  8|  -9.689| 100.07|           1.0|   0|
|         0.0|                NaN|   0.0|233.40363|  0|  -9.013|119.293|           4.0|1982|
|         0.0| 0.6045007385888197|   0.0|209.60608|  2|  -4.501|129.738|           4.0|2007|
|         0.0|                NaN|   0.0| 267.7024|  5|  -9.323|147.782|           3.0|   0|
|         0.0|                NaN|   0.0|114.78159|  1| -17.302|111.787|           1.0|   0|
|         0.0|                NaN|   0.0|189.57016|  4| -11.642| 101.4

                                                                                

100
202


In [17]:
import io

# ... (other code definitions)

# Parallelized file processing with foreachPartition
def process_partition(iterator):
    song_data = []
    for path in iterator:
        # Process individual files within a partition
        row = add_song(spark_session.createDataFrame([], columns), path)
        song_data.append(row)
    return song_data

# Listing HDF5 files using Spark's FileSystem

song_paths_rdd = sc.parallelize(song_paths)

# Parallel processing
song_data = []
song_paths_rdd.foreachPartition(process_partition, includes=song_data)

# Combine processed song data
df = spark.createDataFrame(song_data, columns)

df.show()
df.printSchema()
print(df.count())
print(df.rdd.getNumPartitions())


TypeError: foreachPartition() got an unexpected keyword argument 'includes'

In [None]:
spark_session.stop


In [18]:

def collect_song_data(iterator):
    song_data = []
    for path in iterator:
        row = add_song(spark_session.createDataFrame([], columns), path)
        song_data.append(row)
    return song_data

song_paths_rdd = sc.parallelize(song_paths)

song_data = song_paths_rdd.mapPartitions(collect_song_data).collect()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Py4JError: An error occurred while calling o30.sc