In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import StorageLevel
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, \
                                        LogisticRegressionModel, RandomForestClassificationModel, GBTClassificationModel


spark = SparkSession.builder \
    .appName("Network_Flow_Classification") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.shuffle.partitions", "300") \
    .config("spark.memory.fraction", "0.8") \
    .getOrCreate()

In [2]:
base_df = spark.read.csv("reduced_datasetv2.csv", header=True, inferSchema=True)
base_df.show(5)

+---------------+-----------+-------------+-----------+--------+--------+--------+-------+---------+--------+---------+----------------+----------------+--------------------------+-----------+------------+-------+-------+----------------+-----------------+--------------+--------------+-----------------------+-----------------------+----------------------+---------------------+-----------------------+----------------------+-------------------------+-------------------------+------------------------+-------------------------+-------------------------+--------------------------+---------------------------+--------------+---------------+---------+--------------+------------+--------------+--------------+--------------------+-----+--------+---------------+
|  IPV4_SRC_ADDR|L4_SRC_PORT|IPV4_DST_ADDR|L4_DST_PORT|PROTOCOL|L7_PROTO|IN_BYTES|IN_PKTS|OUT_BYTES|OUT_PKTS|TCP_FLAGS|CLIENT_TCP_FLAGS|SERVER_TCP_FLAGS|FLOW_DURATION_MILLISECONDS|DURATION_IN|DURATION_OUT|MIN_TTL|MAX_TTL|LONGEST_FLOW_PKT|

In [3]:
df = base_df.drop("Attack", "Dataset")
df.printSchema()

root
 |-- IPV4_SRC_ADDR: string (nullable = true)
 |-- L4_SRC_PORT: integer (nullable = true)
 |-- IPV4_DST_ADDR: string (nullable = true)
 |-- L4_DST_PORT: integer (nullable = true)
 |-- PROTOCOL: integer (nullable = true)
 |-- L7_PROTO: double (nullable = true)
 |-- IN_BYTES: integer (nullable = true)
 |-- IN_PKTS: integer (nullable = true)
 |-- OUT_BYTES: integer (nullable = true)
 |-- OUT_PKTS: integer (nullable = true)
 |-- TCP_FLAGS: integer (nullable = true)
 |-- CLIENT_TCP_FLAGS: integer (nullable = true)
 |-- SERVER_TCP_FLAGS: integer (nullable = true)
 |-- FLOW_DURATION_MILLISECONDS: integer (nullable = true)
 |-- DURATION_IN: integer (nullable = true)
 |-- DURATION_OUT: integer (nullable = true)
 |-- MIN_TTL: integer (nullable = true)
 |-- MAX_TTL: integer (nullable = true)
 |-- LONGEST_FLOW_PKT: integer (nullable = true)
 |-- SHORTEST_FLOW_PKT: integer (nullable = true)
 |-- MIN_IP_PKT_LEN: integer (nullable = true)
 |-- MAX_IP_PKT_LEN: integer (nullable = true)
 |-- SRC_TO

In [4]:
df.count()

3810000

In [5]:
df2 = df.na.drop()
df2.count()

3810000

In [6]:
from pyspark.ml.feature import StringIndexer

si = StringIndexer(inputCols=["IPV4_SRC_ADDR", "IPV4_DST_ADDR"], outputCols=["IPV4_SRC_ADDR_Index", "IPV4_DST_ADDR_Index"])
si_fit = si.fit(df)

df_indexed = si_fit.transform(df)

In [7]:
df = df_indexed.drop("IPV4_SRC_ADDR", "IPV4_DST_ADDR")
df.printSchema()

root
 |-- L4_SRC_PORT: integer (nullable = true)
 |-- L4_DST_PORT: integer (nullable = true)
 |-- PROTOCOL: integer (nullable = true)
 |-- L7_PROTO: double (nullable = true)
 |-- IN_BYTES: integer (nullable = true)
 |-- IN_PKTS: integer (nullable = true)
 |-- OUT_BYTES: integer (nullable = true)
 |-- OUT_PKTS: integer (nullable = true)
 |-- TCP_FLAGS: integer (nullable = true)
 |-- CLIENT_TCP_FLAGS: integer (nullable = true)
 |-- SERVER_TCP_FLAGS: integer (nullable = true)
 |-- FLOW_DURATION_MILLISECONDS: integer (nullable = true)
 |-- DURATION_IN: integer (nullable = true)
 |-- DURATION_OUT: integer (nullable = true)
 |-- MIN_TTL: integer (nullable = true)
 |-- MAX_TTL: integer (nullable = true)
 |-- LONGEST_FLOW_PKT: integer (nullable = true)
 |-- SHORTEST_FLOW_PKT: integer (nullable = true)
 |-- MIN_IP_PKT_LEN: integer (nullable = true)
 |-- MAX_IP_PKT_LEN: integer (nullable = true)
 |-- SRC_TO_DST_SECOND_BYTES: double (nullable = true)
 |-- DST_TO_SRC_SECOND_BYTES: double (nullable

In [9]:
df = df.na.drop()
df.count()

vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)

corr = Correlation.corr(df_vector, vector_col)

matrix = corr.collect()[0][0] 
corr_matrix = matrix.toArray().tolist() 
corr_matrix_df = pd.DataFrame(data=corr_matrix, columns = df.columns, index=df.columns) 
corr_matrix_df.style.background_gradient(cmap='coolwarm')

Unnamed: 0,L4_SRC_PORT,L4_DST_PORT,PROTOCOL,L7_PROTO,IN_BYTES,IN_PKTS,OUT_BYTES,OUT_PKTS,TCP_FLAGS,CLIENT_TCP_FLAGS,SERVER_TCP_FLAGS,FLOW_DURATION_MILLISECONDS,DURATION_IN,DURATION_OUT,MIN_TTL,MAX_TTL,LONGEST_FLOW_PKT,SHORTEST_FLOW_PKT,MIN_IP_PKT_LEN,MAX_IP_PKT_LEN,SRC_TO_DST_SECOND_BYTES,DST_TO_SRC_SECOND_BYTES,RETRANSMITTED_IN_BYTES,RETRANSMITTED_IN_PKTS,RETRANSMITTED_OUT_BYTES,RETRANSMITTED_OUT_PKTS,SRC_TO_DST_AVG_THROUGHPUT,DST_TO_SRC_AVG_THROUGHPUT,NUM_PKTS_UP_TO_128_BYTES,NUM_PKTS_128_TO_256_BYTES,NUM_PKTS_256_TO_512_BYTES,NUM_PKTS_512_TO_1024_BYTES,NUM_PKTS_1024_TO_1514_BYTES,TCP_WIN_MAX_IN,TCP_WIN_MAX_OUT,ICMP_TYPE,ICMP_IPV4_TYPE,DNS_QUERY_ID,DNS_QUERY_TYPE,DNS_TTL_ANSWER,FTP_COMMAND_RET_CODE,Label,IPV4_SRC_ADDR_Index,IPV4_DST_ADDR_Index
L4_SRC_PORT,1.0,-0.075507,-0.014697,-0.232025,-0.003113,0.004705,0.001131,0.002777,0.175772,0.172958,0.144149,-0.336302,-0.249085,-0.024329,-0.029265,-0.02955,0.169018,-0.108013,0.364736,0.169018,0.0,0.000411,-0.004836,-0.005737,-0.016163,-0.019498,0.000925,0.013531,0.00985,-0.007184,-6.7e-05,-0.004813,0.000305,0.139869,0.168013,-0.062438,-0.062437,0.245133,0.047907,-0.004317,-0.019549,-0.299643,0.039236,0.072888
L4_DST_PORT,-0.075507,1.0,-0.210726,-0.175842,0.003127,-0.000546,0.00208,0.0044,-0.008081,-0.031079,0.072161,-0.258487,-0.178281,0.053748,-0.283796,-0.282946,-0.004473,-0.098176,0.025341,-0.004473,-0.0,0.003112,0.002541,0.005322,0.025394,0.034258,0.027228,0.072722,-0.003354,-0.009429,0.00173,0.001678,0.001667,0.046489,-0.034594,-0.047728,-0.047728,-0.099944,-0.016186,0.011536,-0.022467,-0.105834,-0.019183,-0.026698
PROTOCOL,-0.014697,-0.210726,1.0,0.612365,-0.001784,0.004071,-0.008031,-0.015257,-0.309481,-0.283463,-0.39475,0.117472,0.035049,-0.103885,-0.077725,-0.078675,-0.348178,-0.300649,0.021029,-0.348178,0.0,-0.000542,-0.005756,-0.020885,-0.029921,-0.042002,-0.121168,-0.076396,0.014527,-0.033723,-0.014909,-0.005599,-0.007996,-0.340217,-0.354368,-0.24239,-0.24239,0.403709,0.085282,0.00249,-0.056011,0.049075,-0.079536,-0.110608
L7_PROTO,-0.232025,-0.175842,0.612365,1.0,-0.005543,-0.006038,-0.00534,-0.010766,-0.185352,-0.165355,-0.267603,0.495696,0.222536,-0.042005,0.174807,0.174183,-0.27566,-0.446087,-0.475824,-0.27566,-0.0,-0.000426,-0.00419,-0.013805,-0.025415,-0.033583,-0.091859,-0.04669,-0.010178,-0.034985,-0.010149,-0.004772,-0.005344,-0.203885,-0.219924,-0.177319,-0.177319,-0.224389,-0.047499,-0.001417,-0.049651,0.382757,-0.049473,-0.034284
IN_BYTES,-0.003113,0.003127,-0.001784,-0.005543,1.0,0.51585,0.059944,0.261801,0.002022,0.002314,0.003878,0.003772,0.228285,0.011012,0.012691,0.01274,0.030394,0.01237,0.006881,0.030394,0.0,0.001083,0.224321,0.150695,0.00851,0.013231,0.179918,0.011625,0.177139,0.2167,0.461228,0.756653,0.175485,0.003207,0.003946,0.004483,0.00448,-0.003625,-0.000759,-2.2e-05,-0.000172,-0.005067,0.000864,0.004876
IN_PKTS,0.004705,-0.000546,0.004071,-0.006038,0.51585,1.0,0.111459,0.150921,0.001242,0.001649,0.002345,0.006022,0.585386,0.004607,0.018738,0.018849,0.010393,-0.000329,-0.002148,0.010393,0.0,0.003315,0.031604,0.033426,0.023953,0.027263,0.043271,0.023455,0.375344,0.063616,0.112224,0.14139,0.12593,0.00422,0.002909,0.002977,0.002976,-0.005038,-0.001052,-3.1e-05,0.000698,-0.000284,-0.000844,0.005058
OUT_BYTES,0.001131,0.00208,-0.008031,-0.00534,0.059944,0.111459,1.0,0.893097,0.007631,0.007943,0.012169,0.001877,-0.005819,0.00664,0.009259,0.009342,0.050551,-0.003113,0.010932,0.050551,-0.0,0.00034,0.000934,0.005458,0.154279,0.149985,0.040443,0.180927,0.042274,0.032821,0.118593,0.09568,0.942379,0.022966,0.012484,0.01008,0.01008,-0.003604,-0.000771,-2.3e-05,-0.000511,-0.013822,-0.000569,0.030483
OUT_PKTS,0.002777,0.0044,-0.015257,-0.010766,0.261801,0.150921,0.893097,1.0,0.018262,0.018635,0.025622,-0.003798,-0.011615,0.017312,0.015721,0.016011,0.055483,-0.006778,0.022432,0.055483,-0.0,0.011958,0.013488,0.017202,0.156188,0.161761,0.088397,0.175844,0.06779,0.165547,0.277878,0.220041,0.907917,0.035357,0.026272,0.020275,0.020275,-0.005891,-0.001367,-4.8e-05,0.004574,-0.0243,0.000107,0.037658
TCP_FLAGS,0.175772,-0.008081,-0.309481,-0.185352,0.002022,0.001242,0.007631,0.018262,1.0,0.995279,0.72423,-0.198575,-0.218295,0.024368,0.477262,0.476903,0.614632,-0.18327,0.249008,0.614632,-0.0,3.5e-05,0.000924,0.016422,0.002617,0.005697,0.156615,0.106376,-0.00324,0.02226,0.021276,0.006154,0.006868,0.396011,0.661976,0.067808,0.067808,-0.140491,-0.029681,-0.000867,-0.000243,-0.28171,0.016568,0.254799
CLIENT_TCP_FLAGS,0.172958,-0.031079,-0.283463,-0.165355,0.002314,0.001649,0.007943,0.018635,0.995279,1.0,0.700411,-0.187985,-0.210311,0.021372,0.499028,0.498563,0.625145,-0.165892,0.227575,0.625145,-0.0,6e-05,0.001147,0.017181,0.00393,0.007486,0.160075,0.109255,-0.002697,0.022733,0.021853,0.006407,0.007193,0.402923,0.664271,0.072347,0.072347,-0.128679,-0.027185,-0.000794,0.002312,-0.283832,0.015356,0.258421


In [10]:
final_df = df.select(["L4_SRC_PORT", "L7_PROTO", "SERVER_TCP_FLAGS", "FLOW_DURATION_MILLISECONDS", "DURATION_IN", "MIN_IP_PKT_LEN", "DNS_QUERY_ID", "Label"])
final_df.show(5)

+-----------+--------+----------------+--------------------------+-----------+--------------+------------+-----+
|L4_SRC_PORT|L7_PROTO|SERVER_TCP_FLAGS|FLOW_DURATION_MILLISECONDS|DURATION_IN|MIN_IP_PKT_LEN|DNS_QUERY_ID|Label|
+-----------+--------+----------------+--------------------------+-----------+--------------+------------+-----+
|      37995|     0.0|               0|                         0|          0|            65|       24394|    1|
|      22939|   188.0|               0|                   4293717|       1250|             0|           0|    1|
|      23934|     0.0|              27|                         0|          0|            52|           0|    0|
|      57340|     0.0|               0|                         0|          0|             0|           0|    1|
|      17007|     7.0|               0|                   4294358|        609|             0|           0|    1|
+-----------+--------+----------------+--------------------------+-----------+--------------+---

In [11]:
final_df.repartition(1).write.option("header", True).csv("NFC.csv")

In [15]:
df = spark.read.csv("NFC.csv", header=True, inferSchema=True)
df.show(5)

+-----------+--------+----------------+--------------------------+-----------+--------------+------------+-----+
|L4_SRC_PORT|L7_PROTO|SERVER_TCP_FLAGS|FLOW_DURATION_MILLISECONDS|DURATION_IN|MIN_IP_PKT_LEN|DNS_QUERY_ID|Label|
+-----------+--------+----------------+--------------------------+-----------+--------------+------------+-----+
|      37995|     0.0|               0|                         0|          0|            65|       24394|    1|
|      22939|   188.0|               0|                   4293717|       1250|             0|           0|    1|
|      23934|     0.0|              27|                         0|          0|            52|           0|    0|
|      57340|     0.0|               0|                         0|          0|             0|           0|    1|
|      17007|     7.0|               0|                   4294358|        609|             0|           0|    1|
+-----------+--------+----------------+--------------------------+-----------+--------------+---

In [16]:
df = df.limit(1_000_000)

input_cols = df.columns
input_cols.remove("Label")

assembler = VectorAssembler(inputCols=input_cols, outputCol='features')
output = assembler.transform(df)

final_data = output.select(["features", "Label"])

In [5]:
train_data, val_data, test_data = final_data.randomSplit([0.2, 0.4, 0.4], seed=16)

### Random Forest

In [18]:
from time import time

startTime = time()

# Train the Random Forest model on the training data
rf = RandomForestClassifier(labelCol='Label')
rf_model = rf.fit(train_data)


endTime = time()
print(f"{endTime - startTime} seconds")

107.86075949668884 seconds


In [20]:
from pyspark.sql.types import IntegerType

# Validate the model on the validation set
val_predictions = rf_model.transform(val_data)
val_predictions = val_predictions.withColumn("prediction", val_predictions["prediction"].cast(IntegerType()))

In [22]:
# Calculate validation set performance metrics
val_predictions = val_predictions.withColumn("result", F.when((val_predictions["prediction"] == 1) & (val_predictions["Label"] == 1), 0) \
                                                .when((val_predictions["prediction"] == 0) & (val_predictions["Label"] == 1), 1) \
                                                .when((val_predictions["prediction"] == 1) & (val_predictions["Label"] == 0), 2) \
                                                .otherwise(3))

In [25]:
startTime = time()

tp = val_predictions.filter(val_predictions["result"] == 0).count()
fp = val_predictions.filter(val_predictions["result"] == 1).count()
fn = val_predictions.filter(val_predictions["result"] == 2).count()
tn = val_predictions.filter(val_predictions["result"] == 3).count()

endTime = time()
print(f"{endTime - startTime} seconds")

106.02873682975769 seconds


In [26]:
print(
f"True Positives: {tp}\n \
False Positives: {fp}\n \
False Negatives: {fn}\n \
True Negatives: {tn}"
)

True Positives: 931698
 False Positives: 87490
 False Negatives: 32530
 True Negatives: 471294


In [27]:
accuracy = (tp + tn) / (tp + fp + fn + tn)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1_score = (2 * precision * recall) / (precision + recall)

In [28]:
print(
f"Accuracy: {accuracy}\n \
Precision: {precision}\n \
Recall: {recall}\n \
F1-Score: {f1_score}"
)

Accuracy: 0.9211956307632507
 Precision: 0.9141571525567412
 Recall: 0.9662631659731931
 F1-Score: 0.9394882364566989


### Trying stuff

In [3]:
df = spark.read.csv("NFC.csv", header=True, inferSchema=True)
df.show(5)

+-----------+--------+----------------+--------------------------+-----------+--------------+------------+-----+
|L4_SRC_PORT|L7_PROTO|SERVER_TCP_FLAGS|FLOW_DURATION_MILLISECONDS|DURATION_IN|MIN_IP_PKT_LEN|DNS_QUERY_ID|Label|
+-----------+--------+----------------+--------------------------+-----------+--------------+------------+-----+
|      37995|     0.0|               0|                         0|          0|            65|       24394|    1|
|      22939|   188.0|               0|                   4293717|       1250|             0|           0|    1|
|      23934|     0.0|              27|                         0|          0|            52|           0|    0|
|      57340|     0.0|               0|                         0|          0|             0|           0|    1|
|      17007|     7.0|               0|                   4294358|        609|             0|           0|    1|
+-----------+--------+----------------+--------------------------+-----------+--------------+---

In [8]:
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import functions as F
from time import time

startTime = time()

# Split the data into training (60%), validation (20%), and testing (20%) sets
train_data, val_data, test_data = final_data.randomSplit([0.2, 0.4, 0.4], seed=16)

# Define a function to calculate precision, recall, f1-score, and accuracy
def calculate_metrics(predictions):
    # Confusion matrix components
    tp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 1)).count()
    fp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 0)).count()
    tn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 0)).count()
    fn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 1)).count()

    # Accuracy: (TP + TN) / Total
    accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) != 0 else 0
    # Precision: TP / (TP + FP)
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    # Recall: TP / (TP + FN)
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    # F1-Score: 2 * (Precision * Recall) / (Precision + Recall)
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    return accuracy, precision, recall, f1_score

# Define a function to train and evaluate the model
def train_and_evaluate_rf(num_trees, max_depth, max_bins):
    # Initialize the RandomForestClassifier with manual hyperparameters
    rf = RandomForestClassifier(labelCol='Label', numTrees=num_trees, maxDepth=max_depth, maxBins=max_bins)
    
    # Train the model on the training data
    rf_model = rf.fit(train_data)
    
    # Validate the model on the validation set
    val_predictions = rf_model.transform(val_data)
    val_predictions = val_predictions.withColumn("prediction", val_predictions["prediction"].cast(IntegerType()))
    
    # Calculate accuracy, precision, recall, and f1-score
    accuracy, precision, recall, f1_score = calculate_metrics(val_predictions)
    
    return rf_model, accuracy, precision, recall, f1_score

# Create a list to store the results for all combinations
results = []

# Manually test different hyperparameters
best_model = None
best_accuracy = 0

# Try different combinations of hyperparameters
for num_trees in [10, 20, 50]:
    for max_depth in [5, 10, 15]:
        for max_bins in [32, 64]:
            rf_model, accuracy, precision, recall, f1_score = train_and_evaluate_rf(num_trees, max_depth, max_bins)
            
            # Store the results of each combination
            results.append((num_trees, max_depth, max_bins, accuracy, precision, recall, f1_score))
            
            print(f"Validation Accuracy for numTrees={num_trees}, maxDepth={max_depth}, maxBins={max_bins}: {accuracy}")
            print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}")
            
            # Track the best model based on validation accuracy
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_model = rf_model

# Print all the results
print("\nAll combinations and their validation accuracy, precision, recall, F1-score:")
for num_trees, max_depth, max_bins, accuracy, precision, recall, f1_score in results:
    print(f"numTrees={num_trees}, maxDepth={max_depth}, maxBins={max_bins}, Accuracy={accuracy}, Precision={precision}, Recall={recall}, F1-Score={f1_score}")

print(f"\nBest Validation Accuracy: {best_accuracy}")

# Test the final best model on the test set
test_predictions = best_model.transform(test_data)
test_predictions = test_predictions.withColumn("prediction", test_predictions["prediction"].cast(IntegerType()))

# Calculate test set performance metrics
test_accuracy, test_precision, test_recall, test_f1_score = calculate_metrics(test_predictions)

print(f"Test Accuracy: {test_accuracy}")
print(f"Test Precision: {test_precision}, Test Recall: {test_recall}, Test F1-Score: {test_f1_score}")

endTime = time()
print(f"{endTime - startTime} seconds")


Validation Accuracy for numTrees=10, maxDepth=5, maxBins=32: 0.9013835741281093
Precision: 0.9655810534527172, Recall: 0.884149931121638, F1-Score: 0.9230730619988281
Validation Accuracy for numTrees=10, maxDepth=5, maxBins=64: 0.9012824587068257
Precision: 0.9660048786662404, Recall: 0.883576925944968, F1-Score: 0.9229541715989976
Validation Accuracy for numTrees=10, maxDepth=10, maxBins=32: 0.9320248297452679
Precision: 0.9715609398866805, Recall: 0.9255132517258837, F1-Score: 0.9479782379623641
Validation Accuracy for numTrees=10, maxDepth=10, maxBins=64: 0.9362834961247843
Precision: 0.9634319049643636, Recall: 0.940483011966389, F1-Score: 0.9518191508957603
Validation Accuracy for numTrees=10, maxDepth=15, maxBins=32: 0.941747011842323
Precision: 0.9629682334654203, Recall: 0.949462709529547, F1-Score: 0.9561677837162724
Validation Accuracy for numTrees=10, maxDepth=15, maxBins=64: 0.9451534196710203
Precision: 0.9742108784235815, Recall: 0.9430036460397886, F1-Score: 0.9583532762

In [17]:
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import functions as F
from time import time
import itertools

startTime = time()

# Split the data into training (60%), validation (20%), and testing (20%) sets
train_data, val_data, test_data = final_data.randomSplit([0.2, 0.4, 0.4], seed=16)

# Repartition the data to reduce memory pressure
train_data = train_data.repartition(128)
val_data = val_data.repartition(128)
test_data = test_data.repartition(128)

# Persist the data with MEMORY_AND_DISK to allow spilling
train_data.persist(StorageLevel.MEMORY_AND_DISK)
val_data.persist(StorageLevel.MEMORY_AND_DISK)
test_data.persist(StorageLevel.MEMORY_AND_DISK)

# Define a function to calculate precision, recall, f1-score, and accuracy
def calculate_metrics(predictions):
    # Confusion matrix components
    tp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 1)).count()
    fp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 0)).count()
    tn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 0)).count()
    fn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 1)).count()

    # Accuracy: (TP + TN) / Total
    accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) != 0 else 0
    # Precision: TP / (TP + FP)
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    # Recall: TP / (TP + FN)
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    # F1-Score: 2 * (Precision * Recall) / (Precision + Recall)
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    return accuracy, precision, recall, f1_score

# Define a function to train and evaluate the model
def train_and_evaluate_rf(num_trees, max_depth, max_bins, min_instances, min_info_gain, subsampling_rate, feature_subset):
    # Start the timer for this run
    run_start_time = time()

    # Initialize the RandomForestClassifier with manual hyperparameters
    rf = RandomForestClassifier(
        labelCol='Label', 
        numTrees=num_trees, 
        maxDepth=max_depth, 
        maxBins=max_bins, 
        minInstancesPerNode=min_instances, 
        minInfoGain=min_info_gain, 
        subsamplingRate=subsampling_rate, 
        featureSubsetStrategy=feature_subset
    )

    # Train the model on the training data
    rf_model = rf.fit(train_data)

    # Validate the model on the validation set
    val_predictions = rf_model.transform(val_data)
    val_predictions = val_predictions.withColumn("prediction", val_predictions["prediction"].cast(IntegerType()))

    # Calculate accuracy, precision, recall, and f1-score
    accuracy, precision, recall, f1_score = calculate_metrics(val_predictions)

    # Calculate the time taken for this run
    run_time = time() - run_start_time

    return rf_model, accuracy, precision, recall, f1_score, run_time

# Create a list to store the results for all combinations
results = []

# Manually test different hyperparameters
best_model = None
best_accuracy = 0

# Fixed values for hyperparameters

max_depth = 15


# Define hyperparameter ranges for remaining parameters
hyperparams = {
    'num_trees' : [10, 50, 100],
    'max_bins' : [32, 64],
    'min_instances': [1, 5, 10, 20],            # Reduced from [1, 5, 10, 20] to [1, 10]
    'min_info_gain': [0.0, 0.01],        # Kept as is
    'subsampling_rate': [0.5, 0.7, 1.0],      # Reduced from [0.5, 0.7, 1.0] to [0.7, 1.0]
    'feature_subset': ['auto', 'sqrt', 'log2']   # Reduced from ['auto', 'sqrt', 'log2'] to ['auto', 'sqrt']
}

# Generate all possible combinations (Total: 2 * 2 * 2 * 2 = 16)
all_combinations = list(itertools.product(
    hyperparams['num_trees'],
    hyperparams['max_bins'],
    hyperparams['min_instances'],
    hyperparams['min_info_gain'],
    hyperparams['subsampling_rate'],
    hyperparams['feature_subset']
))

# Iterate over all combinations
for num_trees, max_bins, min_instances, min_info_gain, subsampling_rate, feature_subset in all_combinations:
    # Train and evaluate the model
    rf_model, accuracy, precision, recall, f1_score, run_time = train_and_evaluate_rf(
        num_trees, max_depth, max_bins, min_instances, min_info_gain, subsampling_rate, feature_subset
    )

    # Store the results of each combination
    results.append((num_trees, max_bins, min_instances, min_info_gain, subsampling_rate, feature_subset, accuracy, precision, recall, f1_score, run_time))

    # Print the metrics and the time taken for this run
    print(f"Validation Accuracy for numTrees={num_trees}, maxDepth={max_depth}, maxBins={max_bins}, "
          f"minInstances={min_instances}, minInfoGain={min_info_gain}, subsamplingRate={subsampling_rate}, "
          f"featureSubset={feature_subset}: {accuracy}")
    print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}")
    print(f"Time taken for this run: {run_time:.2f} seconds\n")

    # Track the best model based on validation accuracy
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_model = rf_model

# Print all the results
print("\nAll combinations and their validation accuracy, precision, recall, F1-score, and run time:")
for num_trees, max_bins, min_instances, min_info_gain, subsampling_rate, feature_subset, accuracy, precision, recall, f1_score, run_time in results:
    print(f"numTrees={num_trees}, maxBins={max_bins}, minInstances={min_instances}, minInfoGain={min_info_gain}, "
          f"subsamplingRate={subsampling_rate}, featureSubset={feature_subset}, Accuracy={accuracy}, "
          f"Precision={precision}, Recall={recall}, F1-Score={f1_score}, Time={run_time:.2f} seconds")

print(f"\nBest Validation Accuracy: {best_accuracy}")

# Test the final best model on the test set
test_predictions = best_model.transform(test_data)
test_predictions = test_predictions.withColumn("prediction", test_predictions["prediction"].cast(IntegerType()))

# Calculate test set performance metrics
test_accuracy, test_precision, test_recall, test_f1_score = calculate_metrics(test_predictions)

print(f"\nTest Accuracy: {test_accuracy}")
print(f"Test Precision: {test_precision}, Test Recall: {test_recall}, Test F1-Score: {test_f1_score}")

# Unpersist the data
train_data.unpersist()
val_data.unpersist()
test_data.unpersist()

endTime = time()
print(f"\nTotal time taken: {endTime - startTime:.2f} seconds")

Validation Accuracy for numTrees=10, maxDepth=15, maxBins=32, minInstances=1, minInfoGain=0.0, subsamplingRate=0.5, featureSubset=auto: 0.9430390526379065
Precision: 0.9687327082692899, Recall: 0.9452397378406348, F1-Score: 0.9568420413399424
Time taken for this run: 35.19 seconds

Validation Accuracy for numTrees=10, maxDepth=15, maxBins=32, minInstances=1, minInfoGain=0.0, subsamplingRate=0.5, featureSubset=sqrt: 0.9430390526379065
Precision: 0.9687327082692899, Recall: 0.9452397378406348, F1-Score: 0.9568420413399424
Time taken for this run: 21.94 seconds

Validation Accuracy for numTrees=10, maxDepth=15, maxBins=32, minInstances=1, minInfoGain=0.0, subsamplingRate=0.5, featureSubset=log2: 0.9430390526379065
Precision: 0.9687327082692899, Recall: 0.9452397378406348, F1-Score: 0.9568420413399424
Time taken for this run: 22.18 seconds

Validation Accuracy for numTrees=10, maxDepth=15, maxBins=32, minInstances=1, minInfoGain=0.0, subsamplingRate=0.7, featureSubset=auto: 0.9417391347907

In [13]:
test_predictions = best_model.transform(test_data)
test_predictions = test_predictions.withColumn("prediction", test_predictions["prediction"].cast(IntegerType()))

# Calculate test set performance metrics
test_accuracy, test_precision, test_recall, test_f1_score = calculate_metrics(test_predictions)

print(f"\nTest Accuracy: {test_accuracy}")
print(f"Test Precision: {test_precision}, Test Recall: {test_recall}, Test F1-Score: {test_f1_score}")


Test Accuracy: 0.9455915199507765
Test Precision: 0.9749690450394676, Test Recall: 0.9428111728498999, Test F1-Score: 0.9586204928713418


In [16]:
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import functions as F
from time import time
import itertools

startTime = time()

# Split the data into training (60%), validation (20%), and testing (20%) sets
train_data, val_data, test_data = final_data.randomSplit([0.2, 0.4, 0.4], seed=16)

# Repartition the data to reduce memory pressure
train_data = train_data.repartition(128)
val_data = val_data.repartition(128)
test_data = test_data.repartition(128)

# Persist the data with MEMORY_AND_DISK to allow spilling
train_data.persist(StorageLevel.MEMORY_AND_DISK)
val_data.persist(StorageLevel.MEMORY_AND_DISK)
test_data.persist(StorageLevel.MEMORY_AND_DISK)

# Define a function to calculate precision, recall, f1-score, and accuracy
def calculate_metrics(predictions):
    # Confusion matrix components
    tp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 1)).count()
    fp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 0)).count()
    tn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 0)).count()
    fn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 1)).count()

    # Accuracy: (TP + TN) / Total
    accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) != 0 else 0
    # Precision: TP / (TP + FP)
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    # Recall: TP / (TP + FN)
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    # F1-Score: 2 * (Precision * Recall) / (Precision + Recall)
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    return accuracy, precision, recall, f1_score

# Define a function to train and evaluate the model
def train_and_evaluate_rf(num_trees, max_depth, max_bins, min_instances, min_info_gain, subsampling_rate, feature_subset):
    # Start the timer for this run
    run_start_time = time()

    # Initialize the RandomForestClassifier with manual hyperparameters
    rf = RandomForestClassifier(
        labelCol='Label', 
        numTrees=num_trees, 
        maxDepth=max_depth, 
        maxBins=max_bins, 
        minInstancesPerNode=min_instances, 
        minInfoGain=min_info_gain, 
        subsamplingRate=subsampling_rate, 
        featureSubsetStrategy=feature_subset
    )

    # Train the model on the training data
    rf_model = rf.fit(train_data)

    # Validate the model on the validation set
    val_predictions = rf_model.transform(val_data)
    val_predictions = val_predictions.withColumn("prediction", val_predictions["prediction"].cast(IntegerType()))

    # Calculate accuracy, precision, recall, and f1-score
    accuracy, precision, recall, f1_score = calculate_metrics(val_predictions)

    # Calculate the time taken for this run
    run_time = time() - run_start_time

    return rf_model, accuracy, precision, recall, f1_score, run_time

# Create a list to store the results for all combinations
results = []

# Manually test different hyperparameters
best_model = None
best_accuracy = 0

# Fixed values for hyperparameters
num_trees = 100
max_depth = 15
max_bins = 64

# Define hyperparameter ranges for remaining parameters
hyperparams = {
    'min_instances': [1, 5, 10, 20],            # Reduced from [1, 5, 10, 20] to [1, 10]
    'min_info_gain': [0.0, 0.01],        # Kept as is
    'subsampling_rate': [0.5, 0.7, 1.0],      # Reduced from [0.5, 0.7, 1.0] to [0.7, 1.0]
    'feature_subset': ['auto', 'sqrt', 'log2']   # Reduced from ['auto', 'sqrt', 'log2'] to ['auto', 'sqrt']
}

# Generate all possible combinations (Total: 2 * 2 * 2 * 2 = 16)
all_combinations = list(itertools.product(
    hyperparams['min_instances'],
    hyperparams['min_info_gain'],
    hyperparams['subsampling_rate'],
    hyperparams['feature_subset']
))

# Iterate over all combinations
for min_instances, min_info_gain, subsampling_rate, feature_subset in all_combinations:
    # Train and evaluate the model
    rf_model, accuracy, precision, recall, f1_score, run_time = train_and_evaluate_rf(
        num_trees, max_depth, max_bins, min_instances, min_info_gain, subsampling_rate, feature_subset
    )

    # Store the results of each combination
    results.append((min_instances, min_info_gain, subsampling_rate, feature_subset, accuracy, precision, recall, f1_score, run_time))

    # Print the metrics and the time taken for this run
    print(f"Validation Accuracy for numTrees={num_trees}, maxDepth={max_depth}, maxBins={max_bins}, "
          f"minInstances={min_instances}, minInfoGain={min_info_gain}, subsamplingRate={subsampling_rate}, "
          f"featureSubset={feature_subset}: {accuracy}")
    print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}")
    print(f"Time taken for this run: {run_time:.2f} seconds\n")

    # Track the best model based on validation accuracy
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_model = rf_model

# Print all the results
print("\nAll combinations and their validation accuracy, precision, recall, F1-score, and run time:")
for min_instances, min_info_gain, subsampling_rate, feature_subset, accuracy, precision, recall, f1_score, run_time in results:
    print(f"minInstances={min_instances}, minInfoGain={min_info_gain}, subsamplingRate={subsampling_rate}, "
          f"featureSubset={feature_subset}, Accuracy={accuracy}, Precision={precision}, Recall={recall}, "
          f"F1-Score={f1_score}, Time={run_time:.2f} seconds")

print(f"\nBest Validation Accuracy: {best_accuracy}")

# Test the final best model on the test set
test_predictions = best_model.transform(test_data)
test_predictions = test_predictions.withColumn("prediction", test_predictions["prediction"].cast(IntegerType()))

# Calculate test set performance metrics
test_accuracy, test_precision, test_recall, test_f1_score = calculate_metrics(test_predictions)

print(f"\nTest Accuracy: {test_accuracy}")
print(f"Test Precision: {test_precision}, Test Recall: {test_recall}, Test F1-Score: {test_f1_score}")

# Unpersist the data
train_data.unpersist()
val_data.unpersist()
test_data.unpersist()

endTime = time()
print(f"\nTotal time taken: {endTime - startTime:.2f} seconds")

Validation Accuracy for numTrees=100, maxDepth=15, maxBins=64, minInstances=1, minInfoGain=0.0, subsamplingRate=0.5, featureSubset=auto: 0.9437002825254974
Precision: 0.9662693109531192, Recall: 0.9488429293459514, F1-Score: 0.9574768352988805
Time taken for this run: 170.62 seconds

Validation Accuracy for numTrees=100, maxDepth=15, maxBins=64, minInstances=1, minInfoGain=0.0, subsamplingRate=0.5, featureSubset=sqrt: 0.9437002825254974
Precision: 0.9662693109531192, Recall: 0.9488429293459514, F1-Score: 0.9574768352988805
Time taken for this run: 188.89 seconds

Validation Accuracy for numTrees=100, maxDepth=15, maxBins=64, minInstances=1, minInfoGain=0.0, subsamplingRate=0.5, featureSubset=log2: 0.9437002825254974
Precision: 0.9662693109531192, Recall: 0.9488429293459514, F1-Score: 0.9574768352988805
Time taken for this run: 172.24 seconds

Validation Accuracy for numTrees=100, maxDepth=15, maxBins=64, minInstances=1, minInfoGain=0.0, subsamplingRate=0.7, featureSubset=auto: 0.943800

### GBT

In [6]:
from time import time

startTime = time()

# Train the Random Forest model on the training data
gbt = GBTClassifier(labelCol='Label')
gbt_model = gbt.fit(train_data)


endTime = time()
print(f"{endTime - startTime} seconds")

37.18671751022339 seconds


In [8]:
from pyspark.sql.types import IntegerType
predictions = gbt_model.transform(test_data)
predictions = predictions.withColumn("prediction", predictions["prediction"].cast(IntegerType()))
predictions = predictions.withColumn("result", F.when((predictions["prediction"] == 1) & (predictions["Label"] == 1), 0) \
                                                .when((predictions["prediction"] == 0) & (predictions["Label"] == 1), 1) \
                                                .when((predictions["prediction"] == 1) & (predictions["Label"] == 0), 2) \
                                                .otherwise(3))
tp = predictions.filter(predictions["result"] == 0).count()
fp = predictions.filter(predictions["result"] == 1).count()
fn = predictions.filter(predictions["result"] == 2).count()
tn = predictions.filter(predictions["result"] == 3).count()
print(
f"True Positives: {tp}\n \
False Positives: {fp}\n \
False Negatives: {fn}\n \
True Negatives: {tn}"
)

accuracy = (tp + tn) / (tp + fp + fn + tn)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1_score = (2 * precision * recall) / (precision + recall)

print(
f"Accuracy: {accuracy}\n \
Precision: {precision}\n \
Recall: {recall}\n \
F1-Score: {f1_score}"
)

True Positives: 249219
 False Positives: 18036
 False Negatives: 10599
 True Negatives: 121955
Accuracy: 0.9283783006385549
 Precision: 0.9325138912274794
 Recall: 0.9592060596263539
 F1-Score: 0.9456716621796223


### GBT parameters

In [15]:
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import GBTClassifier
from pyspark.sql import functions as F
from pyspark import StorageLevel
from time import time
import itertools

startTime = time()



# Split the data into training (20%), validation (40%), and testing (40%) sets
train_data, val_data, test_data = final_data.randomSplit([0.2, 0.4, 0.4], seed=16)

# Repartition the data to reduce memory pressure
train_data = train_data.repartition(128)
val_data = val_data.repartition(128)
test_data = test_data.repartition(128)

# Persist the data with MEMORY_AND_DISK to allow spilling
train_data.persist(StorageLevel.MEMORY_AND_DISK)
val_data.persist(StorageLevel.MEMORY_AND_DISK)
test_data.persist(StorageLevel.MEMORY_AND_DISK)

# Define a function to calculate precision, recall, f1-score, and accuracy
def calculate_metrics(predictions):
    # Confusion matrix components
    tp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 1)).count()
    fp = predictions.filter((predictions['prediction'] == 1) & (predictions['Label'] == 0)).count()
    tn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 0)).count()
    fn = predictions.filter((predictions['prediction'] == 0) & (predictions['Label'] == 1)).count()

    # Accuracy: (TP + TN) / Total
    accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) != 0 else 0
    # Precision: TP / (TP + FP)
    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    # Recall: TP / (TP + FN)
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    # F1-Score: 2 * (Precision * Recall) / (Precision + Recall)
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    return accuracy, precision, recall, f1_score

# Define a function to train and evaluate the model
def train_and_evaluate_gbt(num_trees, max_depth, learning_rate, subsampling_rate, min_instances, min_info_gain, feature_subset):
    # Start the timer for this run
    run_start_time = time()

    # Initialize the GBTClassifier with manual hyperparameters
    gbt = GBTClassifier(
        labelCol='Label',
        maxIter=num_trees,          # Number of trees
        maxDepth=max_depth,         # Tree depth
        stepSize=learning_rate,     # Learning rate
        subsamplingRate=subsampling_rate,   # Fraction of data for each tree
        minInstancesPerNode=min_instances,  # Minimum instances for splitting
        minInfoGain=min_info_gain,  # Minimum info gain for a split
        featureSubsetStrategy=feature_subset
    )

    # Train the model on the training data
    gbt_model = gbt.fit(train_data)

    # Validate the model on the validation set
    val_predictions = gbt_model.transform(val_data)
    val_predictions = val_predictions.withColumn("prediction", val_predictions["prediction"].cast(IntegerType()))

    # Calculate accuracy, precision, recall, and f1-score
    accuracy, precision, recall, f1_score = calculate_metrics(val_predictions)

    # Calculate the time taken for this run
    run_time = time() - run_start_time

    return gbt_model, accuracy, precision, recall, f1_score, run_time

# Create a list to store the results for all combinations
results = []

# Manually test different hyperparameters
best_model = None
best_accuracy = 0

# Define hyperparameter ranges for Gradient Boosting
hyperparams = {
    'learning_rate': [0.05, 0.1, 0.2],              # Step size shrinkage
    'num_trees': [50, 100],                         # Number of boosting iterations
    'max_depth': [4, 6],                            # Depth of trees
    'subsampling_rate': [0.7, 1.0],                 # Fraction of data for each tree
    'min_instances_per_node': [5, 10],              # Minimum number of instances for a split
    'min_info_gain': [0.0, 0.01],                   # Minimum info gain
    'feature_subset_strategy': ['auto', 'sqrt']     # Feature subset strategy
}

# Generate all possible combinations (Total: 3 * 2 * 2 * 2 * 2 * 2 = 48)
all_combinations = list(itertools.product(
    hyperparams['learning_rate'],
    hyperparams['num_trees'],
    hyperparams['max_depth'],
    hyperparams['subsampling_rate'],
    hyperparams['min_instances_per_node'],
    hyperparams['min_info_gain'],
    hyperparams['feature_subset_strategy']
))

# Iterate over all combinations
for learning_rate, num_trees, max_depth, subsampling_rate, min_instances, min_info_gain, feature_subset in all_combinations:
    # Train and evaluate the model
    gbt_model, accuracy, precision, recall, f1_score, run_time = train_and_evaluate_gbt(
        num_trees, max_depth, learning_rate, subsampling_rate, min_instances, min_info_gain, feature_subset
    )

    # Store the results of each combination
    results.append((learning_rate, num_trees, max_depth, subsampling_rate, min_instances, min_info_gain, feature_subset, accuracy, precision, recall, f1_score, run_time))

    # Print the metrics and the time taken for this run
    print(f"Validation Accuracy for (learning_rate={learning_rate}, num_trees={num_trees}, max_depth={max_depth}, subsampling_rate={subsampling_rate}, min_instances={min_instances}, min_info_gain={min_info_gain}, feature_subset={feature_subset}): {accuracy}")
    print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1_score}, Time: {run_time} seconds")

    # Update the best model if current one is better
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        best_model = gbt_model

# End of training time
endTime = time() - startTime
print(f"\nTotal time taken for tuning: {endTime} seconds")

# Save the best model (Optional)
# best_model.save("path_to_save_model")

# Test the best model on the test set
test_predictions = best_model.transform(test_data)
test_predictions = test_predictions.withColumn("prediction", test_predictions["prediction"].cast(IntegerType()))

# Calculate final test metrics
test_accuracy, test_precision, test_recall, test_f1_score = calculate_metrics(test_predictions)

# Print the final test results
print(f"\nTest Accuracy: {test_accuracy}")
print(f"Test Precision: {test_precision}, Test Recall: {test_recall}, Test F1-Score: {test_f1_score}")



Validation Accuracy for (learning_rate=0.05, num_trees=50, max_depth=4, subsampling_rate=0.7, min_instances=5, min_info_gain=0.0, feature_subset=auto): 0.9241789728895746
Precision: 0.9550591640812053, Recall: 0.9302720578310362, F1-Score: 0.9425026685964133, Time: 133.95767617225647 seconds
Validation Accuracy for (learning_rate=0.05, num_trees=50, max_depth=4, subsampling_rate=0.7, min_instances=5, min_info_gain=0.0, feature_subset=sqrt): 0.9204720780652013
Precision: 0.9564166718468329, Recall: 0.9230094335378016, F1-Score: 0.9394161419576418, Time: 115.95664763450623 seconds
Validation Accuracy for (learning_rate=0.05, num_trees=50, max_depth=4, subsampling_rate=0.7, min_instances=5, min_info_gain=0.01, feature_subset=auto): 0.9114603161881099
Precision: 0.9498642773253688, Recall: 0.9157955516894881, F1-Score: 0.9325188508160733, Time: 86.5714373588562 seconds
Validation Accuracy for (learning_rate=0.05, num_trees=50, max_depth=4, subsampling_rate=0.7, min_instances=5, min_info_ga

### Logistic Regression

In [10]:
from time import time

startTime = time()

lr = LogisticRegression(labelCol='Label')
lr_model = lr.fit(train_data)


endTime = time()
print(f"{endTime - startTime} seconds")

22.139557600021362 seconds


In [11]:
from pyspark.sql.types import IntegerType

predictions = lr_model.transform(test_data)
predictions = predictions.withColumn("prediction", predictions["prediction"].cast(IntegerType()))
predictions = predictions.withColumn("result", F.when((predictions["prediction"] == 1) & (predictions["Label"] == 1), 0) \
                                                .when((predictions["prediction"] == 0) & (predictions["Label"] == 1), 1) \
                                                .when((predictions["prediction"] == 1) & (predictions["Label"] == 0), 2) \
                                                .otherwise(3))
tp = predictions.filter(predictions["result"] == 0).count()
fp = predictions.filter(predictions["result"] == 1).count()
fn = predictions.filter(predictions["result"] == 2).count()
tn = predictions.filter(predictions["result"] == 3).count()
print(
f"True Positives: {tp}\n \
False Positives: {fp}\n \
False Negatives: {fn}\n \
True Negatives: {tn}"
)
accuracy = (tp + tn) / (tp + fp + fn + tn)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1_score = (2 * precision * recall) / (precision + recall)
print(
f"Accuracy: {accuracy}\n \
Precision: {precision}\n \
Recall: {recall}\n \
F1-Score: {f1_score}"
)

True Positives: 219205
 False Positives: 48050
 False Negatives: 10817
 True Negatives: 121737
Accuracy: 0.85276219394761
 Precision: 0.8202091635329554
 Recall: 0.9529740633504621
 F1-Score: 0.8816213096523667
