In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ClassificationModel").getOrCreate()

# Load your CSV file
df = spark.read.csv("daily_aggregation2.csv", header=True, inferSchema=True)

# Show schema and few rows
df.printSchema()
df.show(5)


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- TotalSales: double (nullable = true)
 |-- Anomaly: integer (nullable = true)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+----------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|    CustomerName|TotalSales|Anomaly|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+----------------+----------+-------+
|   536460|    22926|IVORY GIANT GARDE...|       4| 2024-06-16|     5.95|   14849.0|United Kingdom|     Anna Miller|      23.8|    

# New Section

In [3]:
from pyspark.ml.feature import VectorAssembler

# Drop rows with nulls (optional)
df = df.na.drop(subset=["Anomaly", "Quantity", "UnitPrice", "TotalSales"])

# Define features and label
feature_cols = ["Quantity", "UnitPrice", "TotalSales"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

data = assembler.transform(df).select(col("features"), col("Anomaly").alias("label"))
data.show(5)


+-----------------+-----+
|         features|label|
+-----------------+-----+
|  [4.0,5.95,23.8]|    0|
|  [1.0,1.65,1.65]|    0|
|  [6.0,1.85,11.1]|    0|
|[1.0,19.95,19.95]|    0|
| [12.0,1.25,15.0]|    0|
+-----------------+-----+
only showing top 5 rows



In [4]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)


In [5]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)


In [6]:
predictions = lr_model.transform(test_data)
predictions.select("features", "label", "prediction", "probability").show(5)


+--------------------+-----+----------+-----------+
|            features|label|prediction|probability|
+--------------------+-----+----------+-----------+
|[-144.0,0.72,-103...|    0|       0.0|  [1.0,0.0]|
| [-48.0,4.95,-237.6]|    0|       0.0|  [1.0,0.0]|
|  [-36.0,0.85,-30.6]|    0|       0.0|  [1.0,0.0]|
|  [-12.0,2.55,-30.6]|    0|       0.0|  [1.0,0.0]|
|    [-4.0,1.95,-7.8]|    0|       0.0|  [1.0,0.0]|
+--------------------+-----+----------+-----------+
only showing top 5 rows



In [7]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.4f}")


AUC: 1.0000


In [8]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)

# Precision (weighted)
evaluator_prec = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_prec.evaluate(predictions)

# Recall (weighted)
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)

# F1 Score (weighted)
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1 Score: 1.0000


In [9]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)


In [10]:
predictions = lr_model.transform(test_data)
predictions.select("label", "prediction").show(10)


+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 10 rows



In [11]:
predictions = lr_model.transform(test_data)
predictions.select("label", "prediction").show(10)


+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 10 rows



In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

accuracy = evaluator.setMetricName("accuracy").evaluate(predictions)
precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)
f1 = evaluator.setMetricName("f1").evaluate(predictions)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1 Score: 1.0000
