In [9]:
from __future__ import print_function
import findspark
findspark.init()
from pyspark.sql import SparkSession, SQLContext, Row
import seaborn as sns
from pyspark.sql.functions import col, mean, monotonically_increasing_id, floor,row_number
from pyspark.sql.types import StructType,StructField, StringType
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.window import Window


In [12]:
if __name__ == "__main__":

    # Create a SparkSession (Note, the config section is only for Windows!)
    spark = SparkSession.builder.master('local[*]').config('spark.executor.memory', '12g').config('spark.driver.memory', '12g').config('spark.driver.maxResultSize', '12g').config("spark.cores.max", "6").appName("FaultDetection").getOrCreate()
    #spark = SparkSession.builder.appName("RecommenderSystem").getOrCreate()
    
    # Load up data as dataframe
    #data = spark.read.option("header", "true").option("inferSchema", "true").csv("C:/My_Data/MS/CS657/Project/InputData/metadata_train.csv")
    #data.limit(500)
    
    #signalData = spark.read.option("header", "true").option("inferSchema", "true").parquet("C:/My_Data/MS/CS657/Project/InputData/train.parquet")
    
    
    ################################# Visualization of train data ###################################################
    
#     notFaulty = data.select('signal_id').where(data.target == 0).count()
#     faulty = data.select('signal_id').where(data.target == 1).count()
    
#     # 8187 -  signals are not faulty, while 525 are faulty
#     print(notFaulty, faulty)
    
#     # phase wise distribution of faulty vs not faulty signals
#     notFaultyPhase0 = data.select('signal_id').where((data.target == 0) & (data.phase == 0)).count()
#     faultyPhase0 = data.select('signal_id').where((data.target == 1) & (data.phase == 0)).count()
#     print(notFaultyPhase0, faultyPhase0)
    
#     notFaultyPhase1 = data.select('signal_id').where((data.target == 0) & (data.phase == 1)).count()
#     faultyPhase1 = data.select('signal_id').where((data.target == 1) & (data.phase == 1)).count()
#     print(notFaultyPhase1, faultyPhase1)
    
#     notFaultyPhase2 = data.select('signal_id').where((data.target == 0) & (data.phase == 2)).count()
#     faultyPhase2 = data.select('signal_id').where((data.target == 1) & (data.phase == 2)).count()
#     print(notFaultyPhase2, faultyPhase2)
    
    
#     meta_data =  data.toPandas()
#     sns.set(style="darkgrid")
#     sns.countplot(x = 'target',hue = 'phase',data = meta_data)
    
    #################################################################################################################
    
    
    ################################# Feature Extraction ############################################################

    # Create an empty dataframe with a schema
    schema = StructType([StructField('features', VectorUDT(), True)])
    finalfeatures = spark.createDataFrame([], schema)

    for i in range(0,18):
        signalData = spark.read.option("header", "true").option("inferSchema", "true").parquet("C:/My_Data/MS/CS657/Project/InputData/train.parquet")
        signalData = signalData.select(signalData.columns[500*i:500*(i+1)])
        print(500*i,500*(i+1) )
        signalData = signalData.withColumn("index", monotonically_increasing_id())
        
        indexes = signalData.select(col("index"))
        print(indexes)
        minindex = indexes.head()['index']
        if (indexes.tail(1)[0]['index']-minindex != 799999):
            print("ID assign error")
            spark.stop()
            exit()

        n_aggregate_columns = 80000
        signalData = signalData.withColumn('index', signalData['index']-minindex)
        signalData = signalData.withColumn('index', floor(signalData['index']/n_aggregate_columns)).groupBy('index').avg().orderBy('index')
        print((signalData.count(), len(signalData.columns)))
        signalData =  signalData.drop(col("avg(index)"))
        signalDataWithFeatures = spark.createDataFrame(signalData.toPandas().set_index("index").transpose())
        assembler = VectorAssembler(inputCols=[x for x in signalDataWithFeatures.columns],outputCol="features")

        features = assembler.transform(signalDataWithFeatures)
        features = features.select("features")

        features.show()
        print((features.count(), len(features.columns)))
        finalfeatures = finalfeatures.union(features)
    
    finalfeatures = finalfeatures.withColumn('signal_id', row_number().over(Window.orderBy(monotonically_increasing_id()))-1)
    #finalfeatures.show(1000)
    print((finalfeatures.count(), len(finalfeatures.columns)))
    finalfeatures.write.parquet("C:/My_Data/MS/CS657/Project/InputData/featuresData/finalfeatures2.parquet")
    #################################################################################################################
    
    spark.stop()

0 500
DataFrame[index: bigint]
(10, 502)
+--------------------+
|            features|
+--------------------+
|[10.809125,0.5727...|
|[6.6541875,15.337...|
|[-18.8354625,-17....|
|[-9.6913125,1.001...|
|[-9.7676625,-16.9...|
|[18.48585,15.1597...|
|[-19.958425,-19.1...|
|[12.9744125,2.229...|
|[6.595825,16.1692...|
|[-8.734875,5.6185...|
|[-13.418275,-20.3...|
|[20.193325,13.200...|
|[6.224725,14.8163...|
|[-18.17825,-16.41...|
|[10.9962625,0.774...|
|[-11.940825,-19.1...|
|[16.57915,12.3528...|
|[-6.1508,5.39575,...|
|[15.329475,7.5545...|
|[-1.6246375,10.01...|
+--------------------+
only showing top 20 rows

(500, 1)
500 1000
DataFrame[index: bigint]
(10, 502)
+--------------------+
|            features|
+--------------------+
|[18.3385125,16.82...|
|[-16.148425,-21.1...|
|[16.628425,8.7779...|
|[-0.3195,11.89708...|
|[-21.2122375,-15....|
|[3.7881625,-11.08...|
|[15.8534875,23.55...|
|[-15.902425,-6.14...|
|[-2.90755,-12.119...|
|[17.8725125,17.42...|
|[-15.879375,-21.1...|
|[16.8

(500, 1)
6500 7000
DataFrame[index: bigint]
(10, 502)
+--------------------+
|            features|
+--------------------+
|[-18.85165,-19.47...|
|[-2.3539375,9.471...|
|[13.9138875,8.486...|
|[-14.996875,-20.1...|
|[9.03355,-2.67711...|
|[8.755475,16.3427...|
|[-20.5168375,-15....|
|[-13.2632,-21.804...|
|[21.1414,15.48412...|
|[-9.4661375,4.440...|
|[12.5653625,19.55...|
|[2.5791625,-7.868...|
|[-18.0217,-14.085...|
|[-18.32515,-16.84...|
|[1.2358,9.6781875...|
|[14.6517875,3.857...|
|[-1.4590875,-10.5...|
|[-17.10015,-7.991...|
|[16.2297125,16.10...|
|[-18.3902875,-15....|
+--------------------+
only showing top 20 rows

(500, 1)
7000 7500
DataFrame[index: bigint]
(10, 502)
+--------------------+
|            features|
+--------------------+
|[-13.8807125,-1.4...|
|[19.1157875,15.64...|
|[-16.32085,-21.40...|
|[16.759825,8.7792...|
|[-1.19735,12.2587...|
|[-20.0538625,-15....|
|[7.6335375,-3.969...|
|[11.0039125,17.93...|
|[0.0579625,11.675...|
|[-20.44815,-22.42...|
|[17.1083125,6.