In [1]:
from pyspark.sql import SparkSession
import time

# Start stopwatch
startTime = time.time()

# Alternate the amount of cores for performance testing

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("group_09_scale_test")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 8)\
        .config("spark.cores.max","16")\
        .config("spark.executor.instances","2")\
        .getOrCreate()
      
        
# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
import h5py
import sys
import io

# Duplicate the dataset to test for scalability
rdd_1 = spark_context.binaryFiles("hdfs://192.168.1.153:9000/millionsongs/data/*/*/*/*")
rdd_2 = spark_context.binaryFiles("hdfs://192.168.1.153:9000/millionsongs/data/*/*/*/*")

rdd = rdd_1.union(rdd_2) # 2x data
rdd = rdd.union(rdd)     # 4x data
rdd = rdd.union(rdd)     # 8x data
rdd = rdd.union(rdd)     # 16x data
     # read a dataset and return it as a Python list #

# Function that reads the h5 format and extracts the desired feature, in this case duration
def f(x):
    # x[0] = filename
    # x[1] = binary content
    with h5py.File(io.BytesIO(x[1])) as f:
        # drill down with a path
        #return list(f.keys())
        g = f['analysis']['songs']['duration']
        # g is a 'dataset'
        return list(g)

# Apply function to the data set
rdd = rdd.map(f).cache()

In [3]:
# Function that discretizes the duration into half minutes
def time_conversion(duration):
    duration_in_half_minutes = duration[0]/30
    discretitized_duration = round(duration_in_half_minutes)
    discretized_duration_in_minutes = discretitized_duration/2
    return discretized_duration_in_minutes

# Map function to the durations in the RDD
converted_rdd = rdd.map(time_conversion)

In [4]:
# Map reduce by duration length
result = converted_rdd.map(lambda time: (time, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[0], True)

# Stop stopwatch and print time elapsed
endTime = time.time()
print("Time Elapsed {}".format((endTime-startTime)/60))

Time Elapsed 7.5683817585309345


In [5]:
# Plot the song duration distribution
import matplotlib.pyplot as plt
plt.bar(*zip(*result.collect()))
plt.show()

<Figure size 640x480 with 1 Axes>