In [None]:
# Importieren der erforderlichen Bibliotheken und Module für Datenverarbeitung und maschinelles Lernen
from pyspark.sql import SparkSession
from pyspark.ml.feature import RobustScaler,VectorAssembler
from pyspark.ml.linalg import Vectors,DenseVector
from pyspark.sql.functions import col,max,min,udf, concat
from pyspark.sql.types import DoubleType
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,GBTClassifier,LinearSVC
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler


In [None]:
# Hilfsfunktion zum Extrahieren von Werten aus einem Vektor
def extract_from_vector(vec):
    return float(vec[0])

# Funktion zur Analyse der Modellvorhersagen
def analysis(predictions,model):
    # Umwandeln der Vorhersagen in ein RDD-Format für die Metriken-Berechnung
    predictionAndLabels = predictions.rdd.map(lambda lp: (float(lp.prediction), float(lp.Class)))
    
    # Berechnen und Ausgeben von Precision, Recall und F1-Wert für jedes Label    
    metrics = MulticlassMetrics(predictionAndLabels)
    # Erstllen einer Liste für Rückgabewerte
    list_dict_return= []
    # Berechnen der Genauigkeit für jedes Label
    labels = predictionAndLabels.map(lambda lp: lp[1]).distinct().collect()
    for label in sorted(labels):
        precision = metrics.precision(label)
        recall = metrics.recall(label)
        f1Score = metrics.fMeasure(label)
        print(f"Label {label}: Precision = {precision}, Recall = {recall}, F1 Score = {f1Score}")
        list_dict_return.append({"Label":label, "Precision":precision,"Recall":recall, "F1 Score":f1Score, "Model":model})
    return(list_dict_return)

In [None]:
# Pfad und Typ der zu ladenden Datei
file_location = "/FileStore/tables/creditcard.csv"
file_type = "csv"

# Optionen für das Einlesen der CSV-Datei
infer_schema = "True"
first_row_is_header = "true"
delimiter = ","

# Laden der CSV-Datei als DataFrame
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Konvertieren aller Spalten in DoubleType für die maschinelle Verarbeitung
columns = df.columns
for column in columns:
    df = df.withColumn(column, col(column).cast(DoubleType()))


In [None]:
# Erstellen eines VectorAssembler-Objekts zur Transformation der Daten in ein für ML-Modelle geeignetes Format
assembler = VectorAssembler(inputCols=["Amount"], outputCol="v_amount")
scaler = RobustScaler(inputCol="v_amount", outputCol="scaledFeatures", 
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

# Transformieren des DataFrames
transformed_df = assembler.transform(df)

# Fit und Transformation des DataFrames
scalerModel = scaler.fit(transformed_df)
scaledData = scalerModel.transform(transformed_df)
extract_udf = udf(extract_from_vector, DoubleType())
df_with_extracted_value = scaledData.withColumn("Scal_Amount", extract_udf(scaledData["scaledFeatures"]))
df = df_with_extracted_value.drop("scaledFeatures","v_amount")

In [None]:
min_time = df.select(min(col("Time"))).first()[0]
max_time = df.select(max(col("Time"))).first()[0]
df = df.withColumn("Scal_Time", (col("Time") - min_time) / (max_time - min_time))

In [None]:
df.select("Amount").describe().show()
df.select("Scal_Amount").describe().show()
df.select("Time").describe().show()
df.select("Scal_Time").describe().show()

In [None]:
# Definition der zu verwendenden Spalten
feature_columns = df.columns
feature_columns.remove("Amount")
feature_columns.remove("Time")
feature_columns.remove("Class")
label_column = "Class"

vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_t = vector_assembler.transform(df)
# Aufteilung der Daten in Trainings- und Testdatensätze
train_data, test_data, validation_data = df_t.randomSplit([0.80, 0.10, 0.10], seed=1)
list_analysis = []
list_analysis_down =[]

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol=label_column)
model = lr.fit(train_data)
predictions = model.transform(validation_data)
list_analysis.extend(analysis(predictions,"lr"))

In [None]:
rf = RandomForestClassifier(labelCol=label_column, featuresCol="features", numTrees=10)
model = rf.fit(train_data)
predictions = model.transform(validation_data)
list_analysis.extend(analysis(predictions,"rf"))

In [None]:
gbt = GBTClassifier(labelCol=label_column, featuresCol="features", maxIter=10)
model = gbt.fit(train_data)
predictions = model.transform(validation_data)
list_analysis.extend(analysis(predictions,"gbt"))

In [None]:
lsvc = LinearSVC(maxIter=10, regParam=0.1, labelCol=label_column, featuresCol="features")
model = lsvc.fit(train_data)
predictions = model.transform(validation_data)
list_analysis.extend(analysis(predictions,"lsvc"))

In [None]:
class_1_df = df_t.filter(col("class") == 1)

class_1_count = class_1_df.count()

class_0_count = df_t.filter(col("class") == 0).count()
sample_fraction = class_1_count / float(class_0_count)
class_0_sampled_df = df_t.filter(col("class") == 0).sample(withReplacement=False, fraction=sample_fraction)
balanced_df = class_1_df.union(class_0_sampled_df)

In [None]:
train_data_b, test_data_b, validation_data_b = balanced_df.randomSplit([0.80, 0.10, 0.10], seed=1)

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol=label_column)
model = lr.fit(train_data_b)
predictions = model.transform(validation_data_b)
list_analysis_down.extend(analysis(predictions,"lr"))

In [None]:
rf = RandomForestClassifier(labelCol=label_column, featuresCol="features", numTrees=10)
model = rf.fit(train_data_b)
predictions = model.transform(validation_data_b)
list_analysis_down.extend(analysis(predictions,"rf"))

In [None]:
gbt = GBTClassifier(labelCol=label_column, featuresCol="features", maxIter=10)
model = gbt.fit(train_data_b)
predictions = model.transform(validation_data_b)
list_analysis_down.extend(analysis(predictions,"gbt"))

In [None]:
lsvc = LinearSVC(maxIter=10, regParam=0.1, labelCol=label_column, featuresCol="features")
model = lsvc.fit(train_data_b)
predictions = model.transform(validation_data_b)
list_analysis_down.extend(analysis(predictions,"lsvc"))

In [None]:
rdd = spark.sparkContext.parallelize(list_analysis)
df_result = spark.createDataFrame(rdd)
df_result = df_result.withColumn("Model_label", concat(df_result.Model, df_result.Label))
display(df_result)

In [None]:
rdd = spark.sparkContext.parallelize(list_analysis_down)
df_result_down = spark.createDataFrame(rdd)
df_result_down = df_result_down.withColumn("Model_label", concat(df_result_down.Model, df_result_down.Label))
display(df_result_down)

In [None]:
rf = GBTClassifier(labelCol=label_column, featuresCol="features", maxIter=10)
model = gbt.fit(train_data)
predictions = model.transform(test_data)
list_analysis_down.extend(analysis(predictions,"gbt"))

In [None]:
rf = GBTClassifier(labelCol=label_column, featuresCol="features", maxIter=10)
model = gbt.fit(train_data_b)
predictions = model.transform(test_data)
list_analysis_down.extend(analysis(predictions,"gbt"))