# Context Setup

In [1]:
from pprint import pprint
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from tabulate import tabulate
import numpy as np
from timeit import default_timer as timer

# Setup spark session
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.102:7077")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","100s")\
        .config("spark.sql.broadcastTimeout","600s")\
        .config("spark.executor.cores",2)\
        .config("spark.executor.memory",'4g')\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .appName("Benchmark")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

22/03/17 09:56:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/17 09:56:22 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


# Benchmarking

In [2]:
start_time = timer()

## Import and prepare data

In [3]:
#Import data
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.102:9000/data_2021/*.csv')

#Remove unnecessary columns
column_filter = ["date","serial_number","model","failure","smart_9_raw"]
filtered_df = data_frame.select(*column_filter).cache()

                                                                                

## Task 1

In [4]:
num_drives_by_date = filtered_df.groupBy("date").count().orderBy("date").collect()

                                                                                

## Task 2

In [5]:
num_failures_by_date = filtered_df.filter(filtered_df["failure"] == 1).groupBy("date").count().collect()

                                                                                

## Task 3

In [6]:
failures_by_model = filtered_df.filter(filtered_df["failure"] == 1).groupBy("model").count().withColumnRenamed("count","failure_count")
model_count = filtered_df.select("model","serial_number").distinct().groupBy("model").count()
accu_count = model_count.join(failures_by_model,"model")
accu_fail = accu_count.withColumn('failure_rate', accu_count['failure_count'].cast('float')/accu_count['count'].cast('float')*1000)
result = accu_fail.orderBy("failure_rate").take(10)

22/03/17 09:58:05 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.2.55: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
                                                                                

## Task 4

In [7]:
time_on_before_failed = filtered_df.filter(filtered_df["failure"] == 1).select("smart_9_raw").collect()

                                                                                

In [8]:
timer() - start_time

170.83874857999763

# Finish

In [9]:
spark_session.stop()