In [None]:
from pyspark.sql import SparkSession

#connect to Spark
spark = (SparkSession
         .builder
         .appName("ML mit SparkML")
         .getOrCreate()
        )

In [None]:
df = spark.read.csv('aggregated_HDD_Data.csv', header=True, inferSchema=True)

df.printSchema()

In [None]:
from pyspark.ml.feature import StringIndexer

brand_indexer = StringIndexer(inputCol="brand", outputCol="brand_indexed")  # initialize indexer
brand_indexer = brand_indexer.fit(df)  # fit indexer to dataframe
df = brand_indexer.transform(df)  # encode brand

In [None]:
model_indexer = StringIndexer(inputCol="model", outputCol="model_indexed")
model_indexer = model_indexer.fit(df)
df = model_indexer.transform(df)

In [None]:
df[['model', 'model_indexed']].show(n=5)

In [None]:
df = df.drop("brand", "model")

In [None]:
#one-hot encode model and brand column
from pyspark.ml.feature import OneHotEncoder

#initialize
encoder = OneHotEncoder(inputCols=['model_indexed', 'brand_indexed'],
                        outputCols=['model_onehot', 'brand_onehot'])

#fit and apply
encoder = encoder.fit(df)
df = encoder.transform(df)

#drop old columns
df = df.drop('model_indexed', 'brand_indexed')

In [None]:
# rename target col to label -> spark default for target
df = df.withColumnRenamed("failure", "label")

In [None]:
# select all columns that we want to use as features
feature_cols = [col for col in df.columns if col not in ['serial_number', 'days_live', 'label']]

# import and initialize VectorAssembler
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=feature_cols,
                            outputCol="features")

# Now let us use the transform method to transform our dataset
df = assembler.transform(df)

In [None]:
df_train, df_test = df.randomSplit(weights=[0.9, 0.1],
                                   seed=42)

In [None]:
# register training set table for use in SQL queries.
df_train.createOrReplaceTempView("train_set")

spark.sql("""SELECT label, COUNT(label)
FROM train_set        
GROUP BY label""").show()

In [None]:
df_train_classes = spark.sql("""SELECT label, COUNT(label)
                                FROM train_set
                                GROUP BY label""").toPandas()
df_train_classes

In [None]:
df_train_count = df_train.count()
df_train_count

In [None]:
df_train_classes.index = df_train_classes.loc[:, 'label']
weights = df_train_count / df_train_classes.loc[:, 'count(label)']

print(weights)

In [None]:
from pyspark.sql.functions import when
df_train = df_train.withColumn("weights",
                               when(df_train["label"] == 0, weights.loc[0]).otherwise(weights.loc[1]))

In [None]:
from pyspark.ml.classification import LogisticRegression

model = LogisticRegression(weightCol="weights")

In [None]:
model = model.fit(df_train)
df_test_pred = model.transform(df_test)

In [None]:
df_test_pred.select('prediction').show(10)

In [None]:
pred_summary = model.evaluate(df_test)

In [None]:
print('accuracy:', pred_summary.accuracy)
print('recall:', pred_summary.recallByLabel)
print('AUC:', pred_summary.areaUnderROC)

In [None]:
spark.stop()