# Extract Load Transform

Alejandro Villanueva Noriega


In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel, SparkConf
import pandas as pd
from datetime import datetime, timedelta
import seaborn as sns
import copy
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml.feature import StandardScaler, VectorAssembler, PCA
from pyspark.mllib.linalg import SparseVector, DenseVector, VectorUDT
from pyspark.ml.classification import LogisticRegression

%matplotlib inline

In [2]:
df = spark.read.parquet('hdd_dataset_new.parquet')

new_cols = [col.replace('.', '_') for col in df.columns]
df = df.toDF(*new_cols)
new_cols = [col.replace('-', '_') for col in df.columns]
df = df.toDF(*new_cols)


udf_replace = udf(lambda x: x.replace('-','.'),returnType=StringType())
df = df.withColumn("node", udf_replace(df.node))

def abreviating_disk(a):
    return ".".join(a.split(".", 2)[:2])

abreviating_disk_UDF = udf(lambda z: abreviating_disk(z),StringType())

df = df.withColumn("node", abreviating_disk_UDF('node'))


df_labels = spark.read.csv("labels_processed.csv", header=True)
df_labels.createOrReplaceTempView("data")
labelspd = spark.sql("SELECT CONCAT(node, '-',  disk)  FROM data").toPandas()


node_disk = map(lambda x:x.encode("utf-8"),[item for sublist in labelspd.values.tolist() for item in sublist])
df = df.withColumn('nodedisk',concat(df.node,lit('-'), df.disk))
df = df.filter(df.nodedisk.isin(node_disk) == True)
df = df.drop('nodedisk')
print 'El número de filas del DF es de: ' + str(df.count())
df.cache()

El número de filas del DF es de: 100


DataFrame[node: string, disk: string, start_time: bigint, end_time: bigint, smart_Calibration_Retry_Count_raw_value: map<int,float>, smart_Command_Timeout_raw_value: map<int,float>, smart_Current_Pending_Sector_raw_value: map<int,float>, smart_End_to_End_Error_raw_value: map<int,float>, smart_G_Sense_Error_Rate_raw_value: map<int,float>, smart_Hardware_ECC_Recovered_raw_value: map<int,float>, smart_High_Fly_Writes_raw_value: map<int,float>, smart_Load_Cycle_Count_raw_value: map<int,float>, smart_Media_Wearout_Indicator_raw_value: map<int,float>, smart_Multi_Zone_Error_Rate_raw_value: map<int,float>, smart_Offline_Uncorrectable_raw_value: map<int,float>, smart_Power_Off_Retract_Count_raw_value: map<int,float>, smart_Power_Cycle_Count_raw_value: map<int,float>, smart_Power_On_Hours_raw_value: map<int,float>, smart_Program_Fail_Cnt_Total_raw_value: map<int,float>, smart_Raw_Read_Error_Rate_raw_value: map<int,float>, smart_Reallocated_Event_Count_raw_value: map<int,float>, smart_Reallocate

In [3]:
important_attributes = ['smart_Reallocated_Sector_Ct_raw_value','smart_Power_Cycle_Count_raw_value',
                        'smart_Reported_Uncorrect_raw_value', 'smart_Command_Timeout_raw_value',
                        'smart_High_Fly_Writes_raw_value', 'smart_Offline_Uncorrectable_raw_value',
                        'smart_UDMA_CRC_Error_Count_raw_value', 'label'
                       ]

In [4]:
#df = df.where("node='c13.1'").where("disk='sdb'")
# Si queremos pintar un disco en concreto

In [5]:
columnas_initial = df.columns[:]
columnas_a_fusionar_copy = df.columns[4:]

In [6]:
columnas_a_fusionar = df.columns[4:]


def merging(x):
    return dict((key,d[key]) for d in x for key in d)

merging_UDF = udf(lambda z: merging(z),MapType(IntegerType(), FloatType()))

exprs = {x: "collect_list" for x in columnas_a_fusionar}
df = df.groupBy("node","disk", "start_time", "end_time").agg(exprs)

df.cache()
df = df.toDF(*columnas_initial)

print 'El número de filas del DF es de: ' + str(df.count())

El número de filas del DF es de: 61


In [7]:
cols = [merging_UDF(s) for s in df.columns[4:]]
cols.insert(0, df.columns[0])
cols.insert(1, df.columns[1])
cols.insert(2, df.columns[2])
cols.insert(3, df.columns[3])

print 'El número de filas del DF es de: ' + str(df.count())

El número de filas del DF es de: 61


In [8]:
df = df.select(*cols)
df = df.toDF(*columnas_initial)
df.cache()

DataFrame[node: string, disk: string, start_time: bigint, end_time: bigint, smart_Calibration_Retry_Count_raw_value: map<int,float>, smart_Command_Timeout_raw_value: map<int,float>, smart_Current_Pending_Sector_raw_value: map<int,float>, smart_End_to_End_Error_raw_value: map<int,float>, smart_G_Sense_Error_Rate_raw_value: map<int,float>, smart_Hardware_ECC_Recovered_raw_value: map<int,float>, smart_High_Fly_Writes_raw_value: map<int,float>, smart_Load_Cycle_Count_raw_value: map<int,float>, smart_Media_Wearout_Indicator_raw_value: map<int,float>, smart_Multi_Zone_Error_Rate_raw_value: map<int,float>, smart_Offline_Uncorrectable_raw_value: map<int,float>, smart_Power_Off_Retract_Count_raw_value: map<int,float>, smart_Power_Cycle_Count_raw_value: map<int,float>, smart_Power_On_Hours_raw_value: map<int,float>, smart_Program_Fail_Cnt_Total_raw_value: map<int,float>, smart_Raw_Read_Error_Rate_raw_value: map<int,float>, smart_Reallocated_Event_Count_raw_value: map<int,float>, smart_Reallocate

In [9]:
df.limit(1).toPandas()

Unnamed: 0,node,disk,start_time,end_time,smart_Calibration_Retry_Count_raw_value,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,smart_End_to_End_Error_raw_value,smart_G_Sense_Error_Rate_raw_value,smart_Hardware_ECC_Recovered_raw_value,...,smart_Seek_Error_Rate_raw_value,smart_Spin_Retry_Count_raw_value,smart_Spin_Up_Time_raw_value,smart_Start_Stop_Count_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value,smart_Wear_Leveling_Count_raw_value
0,c14.11,sdl,1514764800,1622505600,{},"{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 0.0, 1549008901: 0.0, 1564475402:...",{},{},{},...,"{1592524802: 0.0, 1549008901: 0.0, 1564475402:...",{},"{1592524802: 0.0, 1549008901: 0.0, 1564475402:...",{},"{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 26.0, 1549008901: 24.0, 156447540...","{1592524802: 15963.0, 1549008901: 3886.0, 1564...",{}


In [10]:
disk_crct = spark.read.csv("disk_models_formatted.csv", header=True)
disk_crct.show(10)

+----+--------+---------------+-----+
|disk|capacity|           type| node|
+----+--------+---------------+-----+
| sda|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdb|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdc|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdd|    1.8T|WD2000FYYZ-23UL|c14.4|
| sde|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdf|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdg|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdh|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdi|    1.8T|WD2000FYYZ-23UL|c14.4|
| sdj|    1.8T|WD2000FYYZ-23UL|c14.4|
+----+--------+---------------+-----+
only showing top 10 rows



In [11]:
disk_crct = disk_crct.withColumnRenamed('node', 'b_node')
disk_crct = disk_crct.withColumnRenamed('disk', 'b_disk')
disk_crct.show()

+------+--------+---------------+------+
|b_disk|capacity|           type|b_node|
+------+--------+---------------+------+
|   sda|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdb|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdc|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdd|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sde|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdf|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdg|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdh|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdi|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdj|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdk|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sdl|    1.8T|WD2000FYYZ-23UL| c14.4|
|   sda|    1.8T|WD2000FYYZ-23UL| c14.7|
|   sdb|    1.8T|WD2000FYYZ-23UL| c14.7|
|   sdc|    1.8T|WD2000FYYZ-23UL| c14.7|
|   sdd|    1.8T|WD2000FYYZ-23UL| c14.7|
|   sde|    1.8T|WD2000FYYZ-23UL| c14.7|
|   sdf|    1.8T|WD2000FYYZ-23UL| c14.7|
|   sdg|    1.8T|WD2000FYYZ-23UL| c14.7|
|   sdh|    1.8T|WD2000FYYZ-23UL| c14.7|
+------+--------+---------------+------+
only showing top

In [12]:
df = df.join(disk_crct, (df.node == disk_crct.b_node) & (df.disk == disk_crct.b_disk), how='left')

In [13]:
df = df.drop('b_disk').drop('b_node')

In [14]:
df.limit(1).toPandas()

Unnamed: 0,node,disk,start_time,end_time,smart_Calibration_Retry_Count_raw_value,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,smart_End_to_End_Error_raw_value,smart_G_Sense_Error_Rate_raw_value,smart_Hardware_ECC_Recovered_raw_value,...,smart_Spin_Up_Time_raw_value,smart_Start_Stop_Count_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value,smart_Wear_Leveling_Count_raw_value,capacity,type
0,c14.11,sdl,1514764800,1622505600,{},"{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 0.0, 1549008901: 0.0, 1564475402:...",{},{},{},...,"{1592524802: 0.0, 1549008901: 0.0, 1564475402:...",{},"{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 0.0, 1549008901: 0.0, 1564475402:...","{1592524802: 26.0, 1549008901: 24.0, 156447540...","{1592524802: 15963.0, 1549008901: 3886.0, 1564...",{},,


In [15]:
df = df.withColumn('start_time', from_unixtime('start_time').cast(StringType()))
df = df.withColumn('end_time', from_unixtime('end_time').cast(StringType()))

def converting_timestamp(x):
    return {datetime.utcfromtimestamp(key): value for key, value in x.iteritems()}

converting_timestamp_UDF = udf(lambda x: converting_timestamp(x),returnType=MapType(TimestampType(), FloatType()))

In [16]:
cols = [converting_timestamp_UDF(s) for s in df.columns[4:34]]
cols.insert(0, df.columns[0])
cols.insert(1, df.columns[1])
cols.insert(2, df.columns[2])
cols.insert(3, df.columns[3])
cols.insert(4, df.columns[34])
cols.insert(5, df.columns[35])

columnas_initial.insert(4, df.columns[34])
columnas_initial.insert(5, df.columns[35])

print 'El número de filas del DF es de: ' + str(df.count())

El número de filas del DF es de: 61


In [17]:
df = df.select(*cols)
df = df.toDF(*columnas_initial)
df.cache()

DataFrame[node: string, disk: string, start_time: string, end_time: string, capacity: string, type: string, smart_Calibration_Retry_Count_raw_value: map<timestamp,float>, smart_Command_Timeout_raw_value: map<timestamp,float>, smart_Current_Pending_Sector_raw_value: map<timestamp,float>, smart_End_to_End_Error_raw_value: map<timestamp,float>, smart_G_Sense_Error_Rate_raw_value: map<timestamp,float>, smart_Hardware_ECC_Recovered_raw_value: map<timestamp,float>, smart_High_Fly_Writes_raw_value: map<timestamp,float>, smart_Load_Cycle_Count_raw_value: map<timestamp,float>, smart_Media_Wearout_Indicator_raw_value: map<timestamp,float>, smart_Multi_Zone_Error_Rate_raw_value: map<timestamp,float>, smart_Offline_Uncorrectable_raw_value: map<timestamp,float>, smart_Power_Off_Retract_Count_raw_value: map<timestamp,float>, smart_Power_Cycle_Count_raw_value: map<timestamp,float>, smart_Power_On_Hours_raw_value: map<timestamp,float>, smart_Program_Fail_Cnt_Total_raw_value: map<timestamp,float>, smar

In [18]:
df.limit(1).toPandas()

Unnamed: 0,node,disk,start_time,end_time,capacity,type,smart_Calibration_Retry_Count_raw_value,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,smart_End_to_End_Error_raw_value,...,smart_Seek_Error_Rate_raw_value,smart_Spin_Retry_Count_raw_value,smart_Spin_Up_Time_raw_value,smart_Start_Stop_Count_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value,smart_Wear_Leveling_Count_raw_value
0,c14.11,sdl,2018-01-01 01:00:00,2021-06-01 02:00:00,,,{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...",{},...,"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...",{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...",{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 25.0, 2019-11-10 19:30:0...","{2020-03-09 05:30:02: 13522.0, 2019-11-10 19:3...",{}


In [19]:
df_labels = df_labels.groupBy("node","disk").agg(min('timestamp').alias('timestamp_fail'))
df_labels = df_labels.withColumnRenamed('node', 'b_node')
df_labels = df_labels.withColumnRenamed('disk', 'b_disk')
df = df.join(df_labels, (df.node == df_labels.b_node) & (df.disk == df_labels.b_disk), how='left')
df = df.drop('b_disk').drop('b_node')
df.cache()

DataFrame[node: string, disk: string, start_time: string, end_time: string, capacity: string, type: string, smart_Calibration_Retry_Count_raw_value: map<timestamp,float>, smart_Command_Timeout_raw_value: map<timestamp,float>, smart_Current_Pending_Sector_raw_value: map<timestamp,float>, smart_End_to_End_Error_raw_value: map<timestamp,float>, smart_G_Sense_Error_Rate_raw_value: map<timestamp,float>, smart_Hardware_ECC_Recovered_raw_value: map<timestamp,float>, smart_High_Fly_Writes_raw_value: map<timestamp,float>, smart_Load_Cycle_Count_raw_value: map<timestamp,float>, smart_Media_Wearout_Indicator_raw_value: map<timestamp,float>, smart_Multi_Zone_Error_Rate_raw_value: map<timestamp,float>, smart_Offline_Uncorrectable_raw_value: map<timestamp,float>, smart_Power_Off_Retract_Count_raw_value: map<timestamp,float>, smart_Power_Cycle_Count_raw_value: map<timestamp,float>, smart_Power_On_Hours_raw_value: map<timestamp,float>, smart_Program_Fail_Cnt_Total_raw_value: map<timestamp,float>, smar

In [20]:
df.limit(1).toPandas()

Unnamed: 0,node,disk,start_time,end_time,capacity,type,smart_Calibration_Retry_Count_raw_value,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,smart_End_to_End_Error_raw_value,...,smart_Spin_Retry_Count_raw_value,smart_Spin_Up_Time_raw_value,smart_Start_Stop_Count_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value,smart_Wear_Leveling_Count_raw_value,timestamp_fail
0,c14.11,sdl,2018-01-01 01:00:00,2021-06-01 02:00:00,,,{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...",{},...,{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...",{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 25.0, 2019-11-10 19:30:0...","{2020-03-09 05:30:02: 13522.0, 2019-11-10 19:3...",{},2018-12-20 00:00:00


In [21]:
def filtering_dates_dict(x,y):
    
    #return {k: v for k, v in x.items() if k <= (datetime.strptime(y, "%Y-%m-%d %H:%M:%S")+timedelta(days=-7))}
    return {k: v for k, v in x.items() if k <= (datetime.strptime(y, "%Y-%m-%d %H:%M:%S"))}
    #return {k: v for k, v in x.items() if True} si no queremos filtrar hasta el flallo
    

filtering_UDF = udf(lambda x,y: filtering_dates_dict(x,y),returnType = MapType(TimestampType(), FloatType()))

columnas_initial.insert(6, df.columns[36])

In [22]:
cols = [filtering_UDF(s,df.columns[36]) for s in df.columns[6:36]]
cols.insert(0, df.columns[0])
cols.insert(1, df.columns[1])
cols.insert(2, df.columns[2])
cols.insert(3, df.columns[3])
cols.insert(4, df.columns[4])
cols.insert(5, df.columns[5])
cols.insert(6, df.columns[36])

In [23]:
df.limit(1).toPandas()

Unnamed: 0,node,disk,start_time,end_time,capacity,type,smart_Calibration_Retry_Count_raw_value,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,smart_End_to_End_Error_raw_value,...,smart_Spin_Retry_Count_raw_value,smart_Spin_Up_Time_raw_value,smart_Start_Stop_Count_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value,smart_Wear_Leveling_Count_raw_value,timestamp_fail
0,c14.11,sdl,2018-01-01 01:00:00,2021-06-01 02:00:00,,,{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...",{},...,{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...",{},"{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 0.0, 2019-11-10 19:30:02...","{2020-03-09 05:30:02: 25.0, 2019-11-10 19:30:0...","{2020-03-09 05:30:02: 13522.0, 2019-11-10 19:3...",{},2018-12-20 00:00:00


In [24]:
df = df.select(*cols)
df = df.toDF(*columnas_initial)
df.cache()

DataFrame[node: string, disk: string, start_time: string, end_time: string, capacity: string, type: string, timestamp_fail: string, smart_Calibration_Retry_Count_raw_value: map<timestamp,float>, smart_Command_Timeout_raw_value: map<timestamp,float>, smart_Current_Pending_Sector_raw_value: map<timestamp,float>, smart_End_to_End_Error_raw_value: map<timestamp,float>, smart_G_Sense_Error_Rate_raw_value: map<timestamp,float>, smart_Hardware_ECC_Recovered_raw_value: map<timestamp,float>, smart_High_Fly_Writes_raw_value: map<timestamp,float>, smart_Load_Cycle_Count_raw_value: map<timestamp,float>, smart_Media_Wearout_Indicator_raw_value: map<timestamp,float>, smart_Multi_Zone_Error_Rate_raw_value: map<timestamp,float>, smart_Offline_Uncorrectable_raw_value: map<timestamp,float>, smart_Power_Off_Retract_Count_raw_value: map<timestamp,float>, smart_Power_Cycle_Count_raw_value: map<timestamp,float>, smart_Power_On_Hours_raw_value: map<timestamp,float>, smart_Program_Fail_Cnt_Total_raw_value: ma

In [25]:
df.limit(1).toPandas()

Unnamed: 0,node,disk,start_time,end_time,capacity,type,timestamp_fail,smart_Calibration_Retry_Count_raw_value,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,...,smart_Seek_Error_Rate_raw_value,smart_Spin_Retry_Count_raw_value,smart_Spin_Up_Time_raw_value,smart_Start_Stop_Count_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value,smart_Wear_Leveling_Count_raw_value
0,c14.11,sdl,2018-01-01 01:00:00,2021-06-01 02:00:00,,,2018-12-20 00:00:00,{},"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...",...,"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...",{},"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...",{},"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 24.0, 2018-11-27 04:30:0...","{2018-05-01 13:30:02: 3166.0, 2018-11-27 04:30...",{}


In [26]:
toexplode = df.columns[9:]
res = df.select('node','disk', 'start_time','end_time',
                'capacity', 'type','timestamp_fail',explode(df.columns[8]).alias("datetime", df.columns[8]))

for i in toexplode:
    ddf = df.select(df.node.alias('node_b'),df.disk.alias('disk_b'),explode(i).alias("datetime_b", i))
    res = res.join(ddf, (ddf.node_b == res.node) & (ddf.disk_b == res.disk)& (ddf.datetime_b == res.datetime), how='outer')
    res = res.drop('node_b','disk_b')
    res = res.withColumn('datetime',coalesce(res.datetime,res.datetime_b))
    res = res.drop('datetime_b')
res.cache()

DataFrame[node: string, disk: string, start_time: string, end_time: string, capacity: string, type: string, timestamp_fail: string, datetime: timestamp, smart_Command_Timeout_raw_value: float, smart_Current_Pending_Sector_raw_value: float, smart_End_to_End_Error_raw_value: float, smart_G_Sense_Error_Rate_raw_value: float, smart_Hardware_ECC_Recovered_raw_value: float, smart_High_Fly_Writes_raw_value: float, smart_Load_Cycle_Count_raw_value: float, smart_Media_Wearout_Indicator_raw_value: float, smart_Multi_Zone_Error_Rate_raw_value: float, smart_Offline_Uncorrectable_raw_value: float, smart_Power_Off_Retract_Count_raw_value: float, smart_Power_Cycle_Count_raw_value: float, smart_Power_On_Hours_raw_value: float, smart_Program_Fail_Cnt_Total_raw_value: float, smart_Raw_Read_Error_Rate_raw_value: float, smart_Reallocated_Event_Count_raw_value: float, smart_Reallocated_Sector_Ct_raw_value: float, smart_Reported_Uncorrect_raw_value: float, smart_Runtime_Bad_Block_raw_value: float, smart_See

In [27]:
res.count()

3067082

In [28]:
df.limit(1).toPandas()

Unnamed: 0,node,disk,start_time,end_time,capacity,type,timestamp_fail,smart_Calibration_Retry_Count_raw_value,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,...,smart_Seek_Error_Rate_raw_value,smart_Spin_Retry_Count_raw_value,smart_Spin_Up_Time_raw_value,smart_Start_Stop_Count_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value,smart_Wear_Leveling_Count_raw_value
0,c14.11,sdl,2018-01-01 01:00:00,2021-06-01 02:00:00,,,2018-12-20 00:00:00,{},"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...",...,"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...",{},"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...",{},"{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 0.0, 2018-11-27 04:30:02...","{2018-05-01 13:30:02: 24.0, 2018-11-27 04:30:0...","{2018-05-01 13:30:02: 3166.0, 2018-11-27 04:30...",{}


In [29]:
res = res.drop('start_time','end_time')

In [30]:
res.where('smart_Command_Timeout_raw_value != 0').select(res.columns[:7]).limit(10).show()

+------+----+--------+-----------+-------------------+-------------------+-------------------------------+
|  node|disk|capacity|       type|     timestamp_fail|           datetime|smart_Command_Timeout_raw_value|
+------+----+--------+-----------+-------------------+-------------------+-------------------------------+
|c13.14| sdd|    1.8T|MG04ACA200N|2018-06-15 00:00:00|2018-03-16 03:30:01|                           12.0|
|c13.14| sdd|    1.8T|MG04ACA200N|2018-06-15 00:00:00|2018-03-16 17:00:01|                           12.0|
|c13.14| sdd|    1.8T|MG04ACA200N|2018-06-15 00:00:00|2018-03-17 05:15:01|                           12.0|
|c13.14| sdd|    1.8T|MG04ACA200N|2018-06-15 00:00:00|2018-03-19 03:45:01|                           12.0|
|c13.14| sdd|    1.8T|MG04ACA200N|2018-06-15 00:00:00|2018-04-21 23:00:01|                           30.0|
|c13.14| sdd|    1.8T|MG04ACA200N|2018-06-15 00:00:00|2018-04-24 13:45:01|                           30.0|
|c13.14| sdd|    1.8T|MG04ACA200N|201

In [31]:
res = res.where('type = "WDC WD2003FZEX-0"')

In [32]:
siemprenulos = []
for i,j in zip(map(lambda x: x+' is not null',res.columns[6:35]),res.columns[6:35]) :
    cuenta = res.where(i).count()
    print j + ' : ' + str(cuenta)
    if cuenta == 0:
        siemprenulos.append(j)

smart_Command_Timeout_raw_value : 1609317
smart_Current_Pending_Sector_raw_value : 1609317
smart_End_to_End_Error_raw_value : 0
smart_G_Sense_Error_Rate_raw_value : 0
smart_Hardware_ECC_Recovered_raw_value : 0
smart_High_Fly_Writes_raw_value : 1609115
smart_Load_Cycle_Count_raw_value : 0
smart_Media_Wearout_Indicator_raw_value : 1609317
smart_Multi_Zone_Error_Rate_raw_value : 0
smart_Offline_Uncorrectable_raw_value : 1609317
smart_Power_Off_Retract_Count_raw_value : 1609317
smart_Power_Cycle_Count_raw_value : 1609317
smart_Power_On_Hours_raw_value : 0
smart_Program_Fail_Cnt_Total_raw_value : 1609317
smart_Raw_Read_Error_Rate_raw_value : 1609317
smart_Reallocated_Event_Count_raw_value : 1609317
smart_Reallocated_Sector_Ct_raw_value : 1609317
smart_Reported_Uncorrect_raw_value : 1609317
smart_Runtime_Bad_Block_raw_value : 1609317
smart_Seek_Error_Rate_raw_value : 1609115
smart_Spin_Retry_Count_raw_value : 0
smart_Spin_Up_Time_raw_value : 1609317
smart_Start_Stop_Count_raw_value : 0
smart

In [33]:
res = res.drop(*siemprenulos)

In [34]:
res = res.na.drop()

In [35]:
def adding_labels(real, fail):
    real = datetime.strptime(str(real), "%Y-%m-%d %H:%M:%S")
    fail = datetime.strptime(str(fail), "%Y-%m-%d %H:%M:%S")
    
    #if real>=fail+timedelta(days=-7) and real<=fail:
    #if real>=fail+timedelta(days=-2) and real<=fail:
    #En este caso el lag = 7
    if real>=fail+timedelta(days=-7) and real<=fail:
        return 1 #falla
    else:
        return 0 #nofalla

In [36]:
res.limit(5).toPandas()

Unnamed: 0,node,disk,capacity,type,timestamp_fail,datetime,smart_Command_Timeout_raw_value,smart_Current_Pending_Sector_raw_value,smart_High_Fly_Writes_raw_value,smart_Media_Wearout_Indicator_raw_value,...,smart_Reallocated_Sector_Ct_raw_value,smart_Reported_Uncorrect_raw_value,smart_Runtime_Bad_Block_raw_value,smart_Seek_Error_Rate_raw_value,smart_Spin_Up_Time_raw_value,smart_Temperature_Celsius_raw_value,smart_Total_LBAs_Read_raw_value,smart_UDMA_CRC_Error_Count_raw_value,smart_Unknown_Attribute_raw_value,smart_Unknown_SSD_Attribute_raw_value
0,c13.1,sdb,1.8T,WDC WD2003FZEX-0,2020-04-02 00:00:00,2018-01-26 21:15:01,0.0,1.0,812258600000.0,0.0,...,4258.0,49.0,46.0,0.0,11.0,0.0,0.0,0.0,24.0,13251.0
1,c13.1,sdb,1.8T,WDC WD2003FZEX-0,2020-04-02 00:00:00,2018-01-26 21:30:01,0.0,1.0,812258700000.0,0.0,...,4258.0,49.0,46.0,0.0,11.0,0.0,0.0,0.0,24.0,13251.0
2,c13.1,sdb,1.8T,WDC WD2003FZEX-0,2020-04-02 00:00:00,2018-01-27 19:30:01,0.0,1.0,812259300000.0,0.0,...,4258.0,49.0,46.0,0.0,11.0,0.0,0.0,0.0,25.0,13273.0
3,c13.1,sdb,1.8T,WDC WD2003FZEX-0,2020-04-02 00:00:00,2018-01-28 16:00:01,0.0,1.0,812271100000.0,0.0,...,4258.0,49.0,46.0,0.0,11.0,0.0,0.0,0.0,25.0,13294.0
4,c13.1,sdb,1.8T,WDC WD2003FZEX-0,2020-04-02 00:00:00,2018-01-31 14:15:02,0.0,1.0,812289100000.0,0.0,...,4258.0,49.0,46.0,0.0,11.0,0.0,0.0,0.0,24.0,13364.0


In [37]:
adding_labels_UDF = udf(lambda x,y: adding_labels(x,y),StringType())

In [38]:
res = res.withColumn('label',adding_labels_UDF(res.datetime,res.timestamp_fail))

In [39]:
res.createOrReplaceTempView("data2")

In [40]:
spark.sql("SELECT COUNT(*) FROM data2 WHERE LABEL=1").show()

+--------+
|count(1)|
+--------+
|   14415|
+--------+



In [41]:
spark.sql("SELECT COUNT(*) FROM data2 WHERE LABEL=0").show()

+--------+
|count(1)|
+--------+
| 1594700|
+--------+



In [42]:
#res.write.parquet("etl/etldone_1.parquet", mode='overwrite')