In [1]:
%load_ext autoreload
%autoreload 2

# Standard libraries
import os
import numpy as np
import pandas as pd
from math import ceil
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output


#Import all functions (improve readibility)
from Functions import *

# PySpark core
from pyspark import SparkFiles
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.types import IntegerType

# PySpark functions
from pyspark.sql.functions import (
    coalesce,
    col, lit, expr, when, count, sum as spark_sum, abs as spark_abs,
    round as spark_round, min as spark_min, max as spark_max, avg as spark_avg,
    first, last, lag, row_number, desc, asc,
    explode, sequence, from_unixtime, to_date, unix_timestamp,
    window, min_by, mode, concat, monotonically_increasing_id
)

In [None]:
#nCore = np.arange(1, 17)
#nPartitions = np.arange(2, 33, 2)
nCores = [10, 12]
nPartitions = [8, 10]
frequency = 60
sensors = ['P1', 'P10', 'P15', 'P16', 'P17', 'P18', 'P2', 'P5', 'P6', 'P7', 'P8', 'P9', 'S1', 'S10', 'S100', 'S101', 'S102', 'S106', 
           'S107', 'S108', 'S109', 'S11', 'S110', 'S112', 'S113', 'S114', 'S115', 'S117', 'S118', 'S122', 'S123', 'S124', 'S125', 'S126', 
           'S127', 'S128', 'S129', 'S130', 'S137', 'S138', 'S140', 'S143', 'S147', 'S15', 'S151', 'S154', 'S157', 'S158', 'S159', 'S16', 
           'S163', 'S164', 'S165', 'S166', 'S167', 'S169', 'S17', 'S170', 'S171', 'S172', 'S173', 'S174', 'S175', 'S176', 'S178', 'S179', 
           'S180', 'S181', 'S183', 'S19', 'S2', 'S201', 'S202', 'S203', 'S204', 'S205', 'S206', 'S25', 'S3', 'S33', 'S34', 'S35', 'S37', 
           'S39', 'S40', 'S41', 'S42', 'S43', 'S45', 'S46', 'S47', 'S49', 'S5', 'S50', 'S53', 'S54', 'S55', 'S56', 'S57', 'S6', 'S63', 
           'S64', 'S69', 'S7', 'S70', 'S71', 'S72', 'S73', 'S8', 'S80', 'S81', 'S83', 'S86', 'S9', 'S90', 'S94', 'S97', 'SA1', 'SA10', 
           'SA11', 'SA12', 'SA2', 'SA3', 'SA4', 'SA5', 'SA6', 'SA7', 'SA8', 'SA9', 'SW']
alarms = ['A5', 'A9', 'ComError']
engines = ["S117", "S118", "S169", "S170"]

OptimizationResults = {}


for core in nCores:
    for partitions in nPartitions:

        clear_output(wait=True)

        #----------------------CREATING DATAFRAME --------------------------
        
        
        TimeResults = {}
        
        #Create spark session
        print(f'Creating Spark session for {(core, partitions)}')
        spark = CreateSparkSession(core, partitions)

        print('Reading the CSV...')
        startTime = time.time()
        df = spark.read.option("header", True).option("inferSchema", True).csv("file:///mnt/shared/dataset.csv").repartition(4, col("hwid"))

        #Convert milliseconds into seconds
        df = df.withColumn("when", spark_round(col("when") / 1000).cast(IntegerType()))
        df.show(3, truncate=False)
        endTime = time.time()

        TimeResults['LoadCSV'] = endTime - startTime
        print('Load CSV time: ', np.round(endTime - startTime, 2), ' seconds')


        #----------------------PREPROCESSING PIPELINE--------------------------

        
        #Focus only on 1 hardware (conventional)
        print('Pivot dataset...')

        startTime = time.time()
        df_all_hw = Pivot(df)

        print('Persist the dataframe...')
        df_all_hw = df_all_hw.persist()
        df_all_hw.select("when","time",*engines).orderBy(col("time").asc()).show(5)
        
        endTime = time.time()

        TimeResults['Pivot'] = endTime - startTime
        print('Pivot time: ', np.round(endTime - startTime, 2), ' seconds')

        
        # Fill sensor gaps and build blocks of independent measurement
        print('Starting preprocessing...')
        
        startTime = time.time()
        #Create grid, homogeneous data
        df_grid = CreateGrid(df_all_hw, interval=frequency)

        #Build independent blocks
        df_blocks = BuildBlocks(df_grid, max_interval = 1800, sensors = sensors )

        #Fill the NULL values
        df_blocks = FillNull(df_blocks, sensors + engines, max_gap=240).persist()
        df_blocks.count()
        
        df_blocks.select("hwid","BlockID","when","window_start","window_end",*engines,*alarms).show(3)
        endTime = time.time()

        TimeResults['Preprocessing'] = endTime - startTime

        print('Preprocessing time: ', np.round(endTime - startTime, 2), ' seconds')

        df_all_hw.unpersist()


        #----------------------ANOMALY DETECTION--------------------------

        useless_sensors, useful_sensors = UsefulSensors(df_blocks, sensors)
        list_hw = df_blocks.select("hwid").distinct().rdd.flatMap(lambda x : x).collect()


        #Compute the anomalies for all the hardware sequentially (parallelized internally)
        print('Starting Anomaly detection...')
        startTime = time.time()
        
        list_df_anomalies = {}
        for hw in list_hw:
            df_anomalies = detect_anomalies( 
                df = df_blocks.filter( col("hwid") == hw),
                time_separator = 60*40, # seconds
                threshold = 8, 
                sensors = engines)

            list_df_anomalies[hw] = df_anomalies

        #show the first 3 rows of anomaly record
        list_df_anomalies['SW-106'].filter('flag_S117').show(3, truncate=False)

        endTime = time.time()

        TimeResults['AnomalyDetection'] = endTime - startTime


        #-----------------CORRELATIONS----------------------
        print('Starting computing Correlations')
        list_df_blocks_anom = {}

        startTime = time.time()
        for hw in list_hw:
            df_anom = list_df_anomalies[hw].select('when', 'flag_anomaly')
            list_df_blocks_anom[hw] = df_blocks.filter( col("hwid") == hw ).join( df_anom, on='when', how='left' )
            
            anomaly_corr = correlations(list_df_blocks_anom[hw], useful_sensors, 'flag_anomaly')
            top_anom_corr = anomaly_corr.head(3)['Sensors'].tolist()
            print(hw, top_anom_corr)
        endTime = time.time()

        TimeResults['Correlations'] = endTime - startTime

        print('Correlation time: ', np.round(endTime - startTime, 2), ' seconds')


    
        OptimizationResults[(core, partitions)] = TimeResults


        np.save('OptimizationResults.npy', OptimizationResults)


        
        spark.stop()

    

Creating Spark session for (10, 8)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/23 08:06:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Reading the CSV...


                                                                                

+----------+------+------+-----+
|when      |hwid  |metric|value|
+----------+------+------+-----+
|1601824322|SW-115|S117  |0    |
|1601824322|SW-115|S115  |0    |
|1601824322|SW-115|S114  |1    |
+----------+------+------+-----+
only showing top 3 rows

Load CSV time:  41.54  seconds
Pivot dataset...


                                                                                

Persist the dataframe...




In [None]:
OptimizationResults