In [1]:
#PYTHON
import h5py
import numpy as np
import os
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import pyspark

#SPARK
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
from operator import add

import sys

# Append the path to Pydoop to sys.path
#sys.path.append("/usr/local/lib/python3.8/dist-packages")

Spark Configuration

In [2]:
def create_spark_session(num_workers, cpu):
    # Spark_session with different number of workers, cpu and file_size
    spark_session = SparkSession.builder \
        .appName("Iteration:3_15_000_Files_workers:"+ str(num_workers) + "_cpu:" + str(cpu)) \
        .master("yarn") \
        .config("spark.executor.instances", num_workers) \
        .config("spark.executor.cores", cpu) \
        .getOrCreate()

    spark_context = spark_session.sparkContext
    spark_context.setLogLevel("WARN")
    sqlContext = SQLContext(spark_context)
    
    return spark_session, spark_context

In [3]:
def run_computations(spark_session, spark_context):
    path_0 = 'hdfs://master-node:9000/user/hadoop/MSD_ASCI_SUB/subfolder_0/'
    path_1 = 'hdfs://master-node:9000/user/hadoop/MSD_ASCI_SUB/subfolder_1/'
    path_2 = 'hdfs://master-node:9000/user/hadoop/MSD_ASCI_SUB/subfolder_2/'
    path_3 = 'hdfs://master-node:9000/user/hadoop/MSD_ASCI_SUB/subfolder_3/'
    path_4 = 'hdfs://master-node:9000/user/hadoop/MSD_ASCI_SUB/subfolder_4/'
    #path_5 = 'hdfs://master-node:9000/user/hadoop/MSD_ASCI_SUB/subfolder_5/'
    path = 'hdfs://master-node:9000/user/hadoop/MSD_ASCI/'
    paths = [path_0, path_1, path_2, path_3, path_4, path]
    
    # path
    #file_contents = spark_context.wholeTextFiles(path).map(lambda x: x[1].replace('\n', '').replace('{', '').replace('}', '').replace(' ', '').split(', '))
    # paths
    file_contents = spark_context.wholeTextFiles(','.join(paths)).map(lambda x: x[1].replace('\n', '').replace('{', '').replace('}', '').replace(' ', '').split(', '))

    split_file_contents = file_contents.map(lambda x: x[0].split(','))
    selected_elements = split_file_contents.map(lambda x: [float(x[i]) for i in [3, 4, 26, 23, 24, 25, 27, 28, 29]])
    
    
    schema = StructType([
    StructField("duration", FloatType(), True),
    StructField("end_of_fade_in", FloatType(), True),
    StructField("start_of_fade_out", FloatType(), True),
    StructField("loudness", FloatType(), True),
    StructField("mode", FloatType(), True),
    StructField("mode_confidence", FloatType(), True),
    StructField("tempo", FloatType(), True),
    StructField("time_signature", FloatType(), True),
    StructField("time_signature_confidence", FloatType(), True)
    ])

    df = spark_session.createDataFrame(selected_elements, schema).cache()

    num_rows = df.count()
    print("Number of rows in the DataFrame:", num_rows)

    df.show()

    average_values = df.agg({'duration': 'avg', 'end_of_fade_in': 'avg', 'start_of_fade_out': 'avg', 
                         'loudness': 'avg', 'mode': 'avg', 'mode_confidence': 'avg', 
                         'tempo': 'avg', 'time_signature': 'avg', 
                         'time_signature_confidence': 'avg'}).collect()[0]
    
    avg_df = pd.DataFrame([average_values.asDict()])

    print("Number of rows in the DataFrame:", num_rows)

    avg_df

    

In [4]:
# Measure time taken for computations with different number of workers
for num_workers in [1, 2]:
    for cpu in [1, 2]:
        spark_session, spark_context =  create_spark_session(num_workers, cpu)
        run_computations(spark_session, spark_context)
        
        spark_session.stop()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/15 16:28:47 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
24/03/15 16:30:13 ERROR TransportClient: Failed to send RPC RPC 4995510330001399207 to /192.168.2.40:54884: io.netty.channel.StacklessClosedChannelException
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
24/03/15 16:30:13 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 1 at RPC address 192.168.2.124:57706, but got no response. Marking as agent lost.
java.io.IOException: Failed to send RPC RPC 4995510330001399207 to /192.168.2.40:54884: io.netty.channel.StacklessClosedChannelException
	at org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(T

Number of rows in the DataFrame: 15015
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
|duration|end_of_fade_in|start_of_fade_out|loudness|mode|mode_confidence|  tempo|time_signature|time_signature_confidence|
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
| 218.932|         0.247|          218.932| -11.197| 0.0|          0.636| 92.198|           4.0|                    0.778|
| 148.035|         0.148|          137.915|  -9.843| 0.0|           0.43|121.274|           4.0|                    0.384|
| 177.475|         0.282|          172.304|  -9.689| 1.0|          0.565| 100.07|           1.0|                      0.0|
| 233.404|           0.0|          217.124|  -9.013| 1.0|          0.749|119.293|           4.0|                      0.0|
| 209.606|         0.066|          198.699|  -4.501| 1.0|          0.371|129.738|           4.0|    

24/03/15 16:33:31 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
                                                                                

Number of rows in the DataFrame: 15015
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
|duration|end_of_fade_in|start_of_fade_out|loudness|mode|mode_confidence|  tempo|time_signature|time_signature_confidence|
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
| 218.932|         0.247|          218.932| -11.197| 0.0|          0.636| 92.198|           4.0|                    0.778|
| 148.035|         0.148|          137.915|  -9.843| 0.0|           0.43|121.274|           4.0|                    0.384|
| 177.475|         0.282|          172.304|  -9.689| 1.0|          0.565| 100.07|           1.0|                      0.0|
| 233.404|           0.0|          217.124|  -9.013| 1.0|          0.749|119.293|           4.0|                      0.0|
| 209.606|         0.066|          198.699|  -4.501| 1.0|          0.371|129.738|           4.0|    

24/03/15 16:34:43 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
                                                                                

Number of rows in the DataFrame: 15015
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
|duration|end_of_fade_in|start_of_fade_out|loudness|mode|mode_confidence|  tempo|time_signature|time_signature_confidence|
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
| 218.932|         0.247|          218.932| -11.197| 0.0|          0.636| 92.198|           4.0|                    0.778|
| 148.035|         0.148|          137.915|  -9.843| 0.0|           0.43|121.274|           4.0|                    0.384|
| 177.475|         0.282|          172.304|  -9.689| 1.0|          0.565| 100.07|           1.0|                      0.0|
| 233.404|           0.0|          217.124|  -9.013| 1.0|          0.749|119.293|           4.0|                      0.0|
| 209.606|         0.066|          198.699|  -4.501| 1.0|          0.371|129.738|           4.0|    

24/03/15 16:35:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
                                                                                

Number of rows in the DataFrame: 15015
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
|duration|end_of_fade_in|start_of_fade_out|loudness|mode|mode_confidence|  tempo|time_signature|time_signature_confidence|
+--------+--------------+-----------------+--------+----+---------------+-------+--------------+-------------------------+
| 218.932|         0.247|          218.932| -11.197| 0.0|          0.636| 92.198|           4.0|                    0.778|
| 148.035|         0.148|          137.915|  -9.843| 0.0|           0.43|121.274|           4.0|                    0.384|
| 177.475|         0.282|          172.304|  -9.689| 1.0|          0.565| 100.07|           1.0|                      0.0|
| 233.404|           0.0|          217.124|  -9.013| 1.0|          0.749|119.293|           4.0|                      0.0|
| 209.606|         0.066|          198.699|  -4.501| 1.0|          0.371|129.738|           4.0|    

# STOP SPARK