In [1]:
import os
import gc

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, RobustScaler
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array

pd.set_option('display.max_rows', 20)
pd.set_option('display.max_columns', 200)

%load_ext autoreload
%autoreload 2

In [2]:
DATA_DIR = '../data'
DATA_FILEPATH = os.path.join(DATA_DIR, 'harddrive_nodup')
CYCLE_ID_FILEPATH = os.path.join(DATA_DIR, 'cycle_id.csv')
CYCLE_ID_FAILURE_FILEPATH = os.path.join(DATA_DIR, 'cycle_id_failure.csv')
os.path.exists(DATA_FILEPATH)
PREPROCESSED_FILEPATH = os.path.join(DATA_DIR, 'harddrive_preprocessed')
TRAINING_FILEPATH = os.path.join(DATA_DIR, 'harddrive_training.csv')
os.path.exists(TRAINING_FILEPATH)

True

## Random seed

In [3]:
RANDOM_SEED = 42
test_frac = 0.1

## Load data with PySpark session

In [4]:
spark = (SparkSession.builder
         .config("spark.driver.memory", "12g")
         .appName("SparkSQL").getOrCreate())

your 131072x1 screen size is bogus. expect trouble
24/08/24 16:29:06 WARN Utils: Your hostname, cedric-yu-work resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/08/24 16:29:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 16:29:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
date_col = 'date'
target_label = 'failure'
id_cols = ['serial_number', 'model']
cycle_id_col = 'cycle_id'


In [6]:
df = (spark.read.option("header", "true")
      .option("inferSchema", "true")
      .csv(TRAINING_FILEPATH))
df = df.sort(id_cols + [date_col])


In [7]:
df.show(10)

24/08/24 15:20:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------------+------------------+----------+-------+--------------------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+-------------------+------------+-------------------+------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+-------------+-------------+-------------+-------------+--------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+-----------------+------

In [8]:
df.count()

2130

In [7]:
gc.collect()

83

In [10]:
df.groupby([target_label]).count().show()

+-------+-----+
|failure|count|
+-------+-----+
|      1|  175|
|      0| 1955|
+-------+-----+



## Train-validation-test split, grouped by cycle_id

In [8]:
[df_train_cycle, df_val_cycle, df_test_cycle] = (
    df.select(cycle_id_col).dropDuplicates().sort(cycle_id_col)
    .randomSplit([1.- 2 * test_frac, test_frac, test_frac],
                 seed=RANDOM_SEED))

In [9]:
for ind_ in [df_train_cycle, df_val_cycle, df_test_cycle]:
    print(ind_.count())

146
18
11


In [10]:
df_train = df.join(df_train_cycle, on=cycle_id_col, how='inner').drop(*(id_cols + [date_col, cycle_id_col]))
df_val = df.join(df_val_cycle, on=cycle_id_col, how='inner').drop(*(id_cols + [date_col, cycle_id_col]))
df_test = df.join(df_test_cycle, on=cycle_id_col, how='inner').drop(*(id_cols + [date_col, cycle_id_col]))


In [11]:
for ind_ in [df_train, df_val, df_test]:
    print(ind_.count())

1775
174
181


## Scale features

In [13]:
num_feature_cols = [col for col in df_train.columns
            if col not in id_cols + [cycle_id_col, date_col, target_label]]

In [14]:
assembler = VectorAssembler(inputCols=num_feature_cols, outputCol='features')
scaler = RobustScaler(inputCol='features', outputCol='features_scaled')
pipeline = Pipeline(stages=[assembler, scaler])

In [15]:
scalerModel = pipeline.fit(df_train)


                                                                                

In [20]:
df_train_scaled = scalerModel.transform(df_train).select(*[target_label, 'features_scaled'])
df_train_scaled = (df_train_scaled.withColumn("features_scaled1", vector_to_array("features_scaled"))
.select([target_label] + [F.col("features_scaled1")[i].alias(col) for i, col in enumerate(num_feature_cols)]))

In [34]:
df_val_scaled = scalerModel.transform(df_val).select(*[target_label, 'features_scaled'])
df_val_scaled = (df_val_scaled.withColumn("features_scaled1", vector_to_array("features_scaled"))
.select([target_label] + [F.col("features_scaled1")[i].alias(col) for i, col in enumerate(num_feature_cols)]))

df_test_scaled = scalerModel.transform(df_test).select(*[target_label, 'features_scaled'])
df_test_scaled = (df_test_scaled.withColumn("features_scaled1", vector_to_array("features_scaled"))
.select([target_label] + [F.col("features_scaled1")[i].alias(col) for i, col in enumerate(num_feature_cols)]))

In [40]:
def scale_features(df, scalerModel, num_feature_cols):
    df_scaled = (scalerModel.transform(df)
                 .select(*[target_label, 'features_scaled'])
                 .withColumn("features_scaled_array",
                             vector_to_array("features_scaled"))
                 .select([target_label] + [
                     F.col("features_scaled_array")[i].alias(col)
                     for i, col in enumerate(num_feature_cols)]))

    return df_scaled

In [41]:
scale_features(df_test_scaled, scalerModel, num_feature_cols)

DataFrame[failure: int, capacity_bytes: double, smart_1_normalized: double, smart_1_raw: double, smart_3_normalized: double, smart_3_raw: double, smart_4_normalized: double, smart_4_raw: double, smart_5_normalized: double, smart_5_raw: double, smart_7_normalized: double, smart_7_raw: double, smart_9_normalized: double, smart_9_raw: double, smart_10_normalized: double, smart_10_raw: double, smart_12_normalized: double, smart_12_raw: double, smart_188_raw: double, smart_192_normalized: double, smart_192_raw: double, smart_193_normalized: double, smart_193_raw: double, smart_194_normalized: double, smart_194_raw: double, smart_197_normalized: double, smart_197_raw: double, smart_198_normalized: double, smart_198_raw: double, smart_199_raw: double, smart_240_raw: double, smart_241_raw: double, smart_242_raw: double, capacity_bytes_lag_1: double, capacity_bytes_lag_2: double, capacity_bytes_lag_3: double, smart_1_normalized_lag_1: double, smart_1_normalized_lag_2: double, smart_1_normalized

In [None]:
df_test_scaled = scalerModel.transform(df_test).select(*[target_label, 'features_scaled'])
df_test_scaled = (df_test_scaled.withColumn("features_scaled1", vector_to_array("features_scaled"))
.select([target_label] + [F.col("features_scaled1")[i].alias(col) for i, col in enumerate(num_feature_cols)]))

In [21]:
df_train_scaled.show()

+-------+--------------+------------------+--------------------+------------------+-----------+------------------+------------------+------------------+-----------+------------------+-----------+------------------+------------------+-------------------+------------+-------------------+------------+-------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+--------------------+-------------+--------------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+------------------+----------------

In [86]:
df_train_scaled.take(1)

[Row(failure=0, capacity_bytes=inf, smart_1_normalized=14.0, smart_1_raw=0.00039892823795941573, smart_3_normalized=16.5, smart_3_raw=0.0, smart_4_raw=1.8235294117647058, smart_5_normalized=0.0, smart_5_raw=0.0, smart_7_normalized=6.25, smart_7_raw=nan, smart_9_normalized=5.764705882352941, smart_9_raw=1.2512780180888714, smart_10_normalized=0.0, smart_10_raw=0.0, smart_12_raw=1.875, smart_188_raw=0.0, smart_192_normalized=0.0, smart_192_raw=76.5, smart_193_normalized=5.263157894736842, smart_193_raw=0.01872750862934221, smart_194_normalized=2.9210526315789473, smart_194_raw=3.8571428571428568, smart_197_normalized=0.0, smart_197_raw=0.0, smart_198_normalized=0.0, smart_198_raw=0.0, smart_199_raw=0.0, smart_240_raw=nan, smart_241_raw=nan, smart_242_raw=nan, capacity_bytes_lag_1=inf, capacity_bytes_lag_2=inf, capacity_bytes_lag_3=inf, smart_1_normalized_lag_1=14.142857142857142, smart_1_normalized_lag_2=14.142857142857142, smart_1_normalized_lag_3=14.0, smart_1_raw_lag_1=1.2052628719302

In [22]:
df_train_scaled = (df_train_scaled
      .select([F.when(F.col(c).isin(["NA", 'NaN', 'nan', 'Infinity']), None)
               .otherwise(F.col(c))
               .alias(c)
               for c in df_train_scaled.columns]))

In [23]:
df_train_scaled.show()

+-------+--------------+------------------+--------------------+------------------+-----------+------------------+------------------+------------------+-----------+------------------+-----------+------------------+------------------+-------------------+------------+-------------------+------------+-------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+--------------------+-------------+--------------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+------------------+----------------

In [24]:
df_train_scaled.describe().show()

24/08/24 16:39:23 WARN DAGScheduler: Broadcasting large task binary with size 1548.1 KiB
[Stage 127:>                                                        (0 + 1) / 1]

+-------+-------------------+--------------+------------------+------------------+------------------+-----------+------------------+--------------------+------------------+-----------+------------------+-----------+------------------+-------------------+-------------------+------------+-------------------+------------------+-------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+--------------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+------------------+------------------+-------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+--------

                                                                                

In [25]:
na_frac = df_train_scaled.select([F.count(F.when(F.col(c).isNull(), c)).alias(c)
                        for c in df_train_scaled.columns]).cache()

In [26]:
na_frac.show()

+-------+--------------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+------------------+-----------+-------------------+------------+-------------------+------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+--------------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+----

In [27]:
def get_less_na_cols(df, na_frac_cutoff: float = 0.1):
    num_samples = df.count()
    na_frac = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c)
                         for c in df.columns]).cache()
    for field in na_frac.schema.fields:
        name = str(field.name)
        na_frac = na_frac.withColumn(name, F.col(name) / num_samples)
    cols_to_keep = [date_col, target_label] + id_cols
    for key, value in na_frac.first().asDict().items():
        if value > na_frac_cutoff:
            continue
        elif key not in cols_to_keep:
            cols_to_keep.append(key)
    return cols_to_keep

In [28]:
na_frac_cutoff = 0.0
# drop columns with fraction of data with NULL larger than cutoff
cols_to_keep = get_less_na_cols(df_train_scaled, na_frac_cutoff=na_frac_cutoff)
cols_to_drop = [col for col in df_train_scaled.columns if col not in cols_to_keep]


24/08/24 16:39:33 WARN CacheManager: Asked to cache already cached data.


In [29]:
cols_to_drop

['capacity_bytes',
 'smart_7_raw',
 'smart_240_raw',
 'smart_241_raw',
 'smart_242_raw',
 'capacity_bytes_lag_1',
 'capacity_bytes_lag_2',
 'capacity_bytes_lag_3',
 'smart_7_raw_lag_1',
 'smart_7_raw_lag_2',
 'smart_7_raw_lag_3',
 'smart_240_raw_lag_1',
 'smart_240_raw_lag_2',
 'smart_240_raw_lag_3',
 'smart_241_raw_lag_1',
 'smart_241_raw_lag_2',
 'smart_241_raw_lag_3',
 'smart_242_raw_lag_1',
 'smart_242_raw_lag_2',
 'smart_242_raw_lag_3']

In [35]:
df_train_scaled1 = df_train_scaled.drop(*cols_to_drop)
df_val_scaled1 = df_val_scaled.drop(*cols_to_drop)
df_test_scaled1 = df_test_scaled.drop(*cols_to_drop)

In [31]:
df_train_scaled1.describe().show()

24/08/24 16:40:43 WARN DAGScheduler: Broadcasting large task binary with size 1334.8 KiB


+-------+-------------------+------------------+------------------+------------------+-----------+------------------+--------------------+------------------+-----------+------------------+------------------+-------------------+-------------------+------------+-------------------+------------------+-------------+--------------------+-----------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+--------------------+-------------+-------------+------------------------+------------------------+------------------------+------------------+------------------+-------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+--

                                                                                

In [36]:
df_val_scaled1.describe().show()

24/08/24 16:41:57 WARN DAGScheduler: Broadcasting large task binary with size 1245.5 KiB


+-------+-------------------+------------------+------------------+------------------+-----------+------------------+--------------------+------------------+-----------+-------------------+------------------+--------------------+-------------------+------------+-------------------+------------------+-------------+--------------------+----------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------------+--------------------+-------------+-------------+------------------------+------------------------+------------------------+-------------------+-------------------+-------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+

In [37]:
df_test_scaled1.describe().show()

24/08/24 16:43:15 WARN DAGScheduler: Broadcasting large task binary with size 1245.5 KiB


+-------+-------------------+------------------+-------------------+------------------+-----------+------------------+--------------------+------------------+-----------+------------------+------------------+-------------------+-------------------+------------+-------------------+------------------+-------------+--------------------+------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------------+--------------------+-------------+-------------+------------------------+------------------------+------------------------+------------------+------------------+-------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+------------------------+------------------------+------------------------+-

In [44]:
spark.stop()