# Lab 3
- Structured API Approach

In [157]:
from pyspark.sql import SparkSession
import os

# Set JAVA_HOME explicitly
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-21-openjdk-amd64'

spark = SparkSession.builder \
    .appName("MySparkApp") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [None]:
# Reduce Spark's verbose logging
spark.sparkContext.setLogLevel("WARN")
print("Spark logging level set to WARN - output will be cleaner now!")

## Important Note About Spark "Errors"

When you run Spark code, you might see messages with `[error]` prefix. **These are NOT actual errors!** 

They are Spark's normal startup logs being displayed through the error stream. The program works perfectly fine.

- `INFO` messages = Normal operation information
- `WARN` messages = Warnings (usually safe to ignore)
- Only `ERROR` messages indicate real problems

The cell above sets logging to `WARN` level to reduce verbose output.

In [158]:
spark   

In [159]:
df = spark.read.format('csv').load("/home/aaronpham/Coding/bigdata/spark/spark_mllib/data/creditcard_balanced_single.csv", header=True, inferSchema=True)

df.printSchema()

root
 |-- Time: integer (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (null

In [160]:
df.show(5)

+----+-----------------+-----------------+------------------+----------------+------------------+-------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+--------------------+-----------------+-----------------+-------------------+-----------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------+-----+
|Time|               V1|               V2|                V3|              V4|                V5|                 V6|               V7|                 V8|                V9|               V10|               V11|               V12|               V13|              V14|                 V15|              V16|              V17|                V18|              V19|                V20|               V21|                V22|               V23|       

In [161]:
df.count()

984

## Split the dataset into train & test

In [162]:
df.groupBy("Class").count().show()

+-----+-----+
|Class|count|
+-----+-----+
|    1|  492|
|    0|  492|
+-----+-----+



In [163]:
# split dataset into train and test
train, test = df.randomSplit([0.8, 0.2])

train, test

(DataFrame[Time: int, V1: double, V2: double, V3: double, V4: double, V5: double, V6: double, V7: double, V8: double, V9: double, V10: double, V11: double, V12: double, V13: double, V14: double, V15: double, V16: double, V17: double, V18: double, V19: double, V20: double, V21: double, V22: double, V23: double, V24: double, V25: double, V26: double, V27: double, V28: double, Amount: double, Class: int],
 DataFrame[Time: int, V1: double, V2: double, V3: double, V4: double, V5: double, V6: double, V7: double, V8: double, V9: double, V10: double, V11: double, V12: double, V13: double, V14: double, V15: double, V16: double, V17: double, V18: double, V19: double, V20: double, V21: double, V22: double, V23: double, V24: double, V25: double, V26: double, V27: double, V28: double, Amount: double, Class: int])

## Replace null value with average value

In [164]:
# Replace the null value with average value
from pyspark.ml.feature import Imputer

numerical_features_lst = train.columns

imputer = Imputer(inputCols=numerical_features_lst, outputCols=numerical_features_lst)

imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)

train.show(3)

+----+-----------------+-----------------+-----------------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+--------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-------------------+-----------------+-----------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+-----+
|Time|               V1|               V2|               V3|              V4|                V5|                V6|               V7|                 V8|                V9|               V10|                 V11|               V12|               V13|              V14|              V15|              V16|               V17|                V18|              V19|              V20|               V21|                V22|               V23|               V2

## Aggregate all columns into one features

In [165]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

# Pick all features into one feature
input_cols = [c for c in df.columns if c != "Class"]

# Assemble into 1 vectors
assembler = VectorAssembler(inputCols=input_cols, outputCol="features_assembled")

In [166]:
train = assembler.transform(train)
test = assembler.transform(test)

train.show(2)

+----+----------------+-----------------+-----------------+---------------+------------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+-----------------+-------------------+-----------------+-----------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+-----+--------------------+
|Time|              V1|               V2|               V3|             V4|                V5|               V6|               V7|                 V8|                V9|               V10|               V11|               V12|               V13|              V14|              V15|              V16|              V17|                V18|              V19|              V20|              V21|                V22|               V23|          

## Standardize the dataset
- Format the data in to a fixed range for further ML processing

For each value it will apply the following algorithms

```
scaled_value = (original_value - mean) / standard_deviation
```

In [167]:
# scale the features 
scaler = StandardScaler(inputCol="features_assembled", outputCol="features", withStd=True, withMean=True)

scaler_model = scaler.fit(train)

train = scaler_model.transform(train)
test = scaler_model.transform(test)

train.show(3)

+----+-----------------+-----------------+-----------------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+--------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+-------------------+-----------------+-----------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+-----+--------------------+--------------------+
|Time|               V1|               V2|               V3|              V4|                V5|                V6|               V7|                 V8|                V9|               V10|                 V11|               V12|               V13|              V14|              V15|              V16|               V17|                V18|              V19|              V20|               V21|              

In [168]:
train.select('features').take(3)

[Row(features=DenseVector([-1.7681, 0.0061, 0.0568, 0.2853, 0.553, 0.2524, -0.427, 0.0356, 0.2347, -0.6884, -0.011, 0.4914, 0.0271, -0.5261, -0.2021, 0.4614, 0.2562, 0.0623, 0.4558, 0.0559, -0.063, 0.0434, 0.0063, -0.4611, 0.6543, 0.016, 0.339, 0.1796, -0.3889, -0.4429])),
 Row(features=DenseVector([-1.7667, -0.1248, -1.2929, 0.7126, 0.015, 0.6954, -0.2302, 0.5313, -0.0502, 0.4032, 0.4273, -0.8265, 0.5512, 0.68, 0.3578, 2.0758, 0.7802, 0.6417, 1.1904, -0.0494, 1.7677, 0.0922, 0.3926, 1.2437, -0.4145, 0.3749, -0.3353, -0.3238, 0.0172, 1.6344])),
 Row(features=DenseVector([-1.75, -0.1954, -1.0817, 0.589, -0.3882, 0.7854, 0.2412, 0.5664, -0.0431, 0.7977, 0.4936, -0.6785, 0.826, 0.8146, 0.302, -1.4982, 0.8194, 0.4881, 0.9231, 0.02, -1.4101, -0.301, 0.6493, 1.9987, -1.4739, 0.5125, -0.9216, 0.1793, -1.6985, 0.1152]))]

# Logistic Regression 

In [169]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="Class")

lr


LogisticRegression_094ab7074c60

In [170]:
model = lr.fit(train)

pred_train_df = model.transform(train)

pred_train_df.show(5)

+----+-----------------+-----------------+-------------------+-------------------+------------------+------------------+-----------------+-------------------+--------------------+------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|Time|               V1|               V2|                 V3|                 V4|                V5|                V6|               V7|                 V8|                  V9|               V10|                 V11|               V12|               V13|               V14|               V15|               V16|               V17|             

## Prediction

In [171]:
pred_test_df = model.transform(test)
pred_test_df.show(10)

+-----+-------------------+-----------------+------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+----------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------------+--------------------+-------+-----+--------------------+--------------------+--------------------+--------------------+----------+
| Time|                 V1|               V2|                V3|              V4|                V5|                 V6|                 V7|                 V8|                V9|               V10|             V11|              V12|               V13|              V14|               V15|               V16|              V17|                V18|    

## Evaluation

In [172]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# AUC
evaluator_auc = BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC")
auc = evaluator_auc.evaluate(pred_test_df)

# Accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="accuracy")
acc = evaluator_acc.evaluate(pred_test_df)

# Precision & Recall
precision = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="weightedPrecision").evaluate(pred_test_df)
recall = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction", metricName="weightedRecall").evaluate(pred_test_df)

print(f"AUC = {auc:.4f}")
print(f"Accuracy = {acc:.4f}")
print(f"Precision = {precision:.4f}")
print(f"Recall = {recall:.4f}")


AUC = 0.9769
Accuracy = 0.9356
Precision = 0.9443
Recall = 0.9356


In [173]:
print("Intercept:", model.interceptVector)
print("Coefficients:", model.coefficients)

Intercept: [3.232785987779118]
Coefficients: [-0.16472484274040347,0.22475069539169282,0.6539398186556453,0.329168762808169,2.271945170727748,0.9219757425963592,-0.6700329257501637,0.16061847009173583,-1.2106345574146702,-0.3290285040183484,-1.201844654165599,0.4140757360043295,-2.3114942076867044,-0.13414072601492324,-3.0034320003616037,-0.2509425005021272,-0.669858731492209,0.049519637700306866,0.21939372676356378,-0.12117089621896632,-0.4375869996205082,0.2573825395733252,0.6711992603935559,-0.17130821808443292,0.20750926588944352,-0.18492060610484665,-0.1632901499080887,0.4933068131598513,0.18453891363396915,0.7638929739457891]


## Save results

In [174]:
pred_test_df.select("Class", "prediction", "probability") \
    .toPandas().to_csv("/home/aaronpham/Coding/bigdata/spark/spark_mllib/results/Classification_Structured.csv", index=False)