<a href="https://colab.research.google.com/github/Natchira/MLiS1/blob/main/local.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [44]:
!pip install pyspark



In [45]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Local approach") \
    .getOrCreate()

sc = spark.sparkContext

In [46]:
import pandas as pd
from pyspark.sql import functions as F
from matplotlib import pyplot as plt

##### Load data

In [47]:
!pip install kaggle



In [48]:
from google.colab import files
files.upload()

!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!pip install kaggle

Saving kaggle.json to kaggle (1).json
mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [49]:
!kaggle datasets download -d mlg-ulb/creditcardfraud --unzip

Dataset URL: https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud
License(s): DbCL-1.0


In [50]:
df = spark.read.csv("creditcard.csv", header=True, inferSchema=True)
df.show(5)

+----+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|                V22|     

 correlation

In [51]:
# from pyspark.ml.stat import Correlation
corr_value = df.stat.corr("Time", "Class")
corr_value

-0.012322570929245536

In [52]:
# from pyspark.ml.stat import Correlation
corr_value = df.stat.corr("Amount", "Class")
corr_value

0.005631753006768532

no linear relationship between the two variables  --> Time,Amount&Class

In [53]:
df = df.drop("Time", "Amount")
df.cache()
df.show(5)

+------------------+-------------------+----------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+-----+
|                V1|                 V2|              V3|                V4|                 V5|                 V6|                 V7|                V8|                V9|                V10|               V11|               V12|               V13|               V14|               V15|               V16|               V17|                V18|               V19|                V20|                 V21|                V22|               V23|   

##### Split Train set/test set

In [54]:
train_df, test_df = df.randomSplit([0.7, 0.3], seed=202503)
print('training:',train_df.count())
print('testing:', test_df.count())

training: 199251
testing: 85556


In [55]:
# calculate the ratio between majority and minority
from pyspark.sql.functions import col, round

# training set
total_train = train_df.count()
train_df.groupBy("class").count()\
    .withColumn("%", round(col("count")/total_train * 100, 3))\
    .show()

# test set
total_test = test_df.count()
test_df.groupBy("class").count()\
    .withColumn("%", round(col("count")/total_test * 100, 3))\
    .show()

+-----+------+------+
|class| count|     %|
+-----+------+------+
|    1|   350| 0.176|
|    0|198901|99.824|
+-----+------+------+

+-----+-----+------+
|class|count|     %|
+-----+-----+------+
|    1|  142| 0.166|
|    0|85414|99.834|
+-----+-----+------+



filter and extract minority

In [56]:
Minority_class = train_df.filter(col("class") == 1)
Minority_class.show(5)

+-----------------+----------------+-----------------+----------------+-----------------+-----------------+-----------------+----------------+-----------------+-----------------+----------------+-----------------+-----------------+-----------------+-------------------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+------------------+----------------+-----------------+-----+
|               V1|              V2|               V3|              V4|               V5|               V6|               V7|              V8|               V9|              V10|             V11|              V12|              V13|              V14|                V15|              V16|              V17|              V18|              V19|             V20|             V21|              V22|              V23|              V24|             V25|               V26|             V27|           

K-Mean

In [57]:
# Normalise our features
from pyspark.ml.feature import VectorAssembler
feature_cols = [c for c in Minority_class.columns if c != "Class"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
feature_vectors = assembler.transform(Minority_class)
feature_vectors.show(5, truncate=False)

+-----------------+----------------+-----------------+----------------+-----------------+-----------------+-----------------+----------------+-----------------+-----------------+----------------+-----------------+-----------------+-----------------+-------------------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+------------------+----------------+-----------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [58]:
# MinMaxScaler
# scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
# scaler_model = scaler.fit(feature_vectors)
# scaled_df = scaler_model.transform(feature_vectors)
# scaled_df.show(5,truncate=False)

In [59]:
feature_vectors.rdd.getNumPartitions()
print("Number of partitions:", feature_vectors.rdd.getNumPartitions())

Number of partitions: 2


In [60]:
# X_df_1 = feature_vectors.select(feature_vectors['features']).repartition(1)
# X_df_1.cache()
# X_df_1.show(5,truncate=False)

In [61]:
X_df_2 = feature_vectors.select(feature_vectors['features'])
X_df_2.cache()
X_df_2.show(5,truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-

 Oversampling Time Measurement

In [62]:
import time
start_time = time.time()

# K-mean

In [63]:
import numpy as np
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.ml.stat import Summarizer

def kmeans(df, k, sample_fraction, threshold=0.01, seed=1):
    # Initial Centroids
    centroids = np.array(df.sample(fraction=sample_fraction,seed=seed).limit(k).collect()).squeeze()

    converged = False
    while not converged:

      print("\nCurrent centroids:\n", centroids)

      # Broadcast centroids to workers
      centroids_bc = sc.broadcast(centroids)

      #  Euclidean distance
      def dist(a, b, axis=1):
            return np.sqrt(np.sum((a - b)**2, axis=axis))

      # Assign each data point to closest centroid
      def assign_cluster(point):
            return int(np.argmin(dist(centroids_bc.value, point)))

      closest = F.udf(assign_cluster, IntegerType())
      df_closest = df.withColumn('closest', closest('features'))

      # Compute new centroids
      new_centroids_df = df_closest.groupBy("closest") \
        .agg(Summarizer.mean(F.col("features")).alias('new_centroids'))

      new_centroids_rows = sorted(new_centroids_df.collect(),key=lambda row: row['closest'])
      new_centroids = np.array(
        [r[1] for r in new_centroids_rows]
      ).squeeze()

      print('new centroids:\n',new_centroids)

      # Check for convergence
      diff = dist(new_centroids,centroids, axis=None)
      print('diff:',diff)
      if diff < threshold:
        converged=True
      else:
        centroids = new_centroids

    final_df = df_closest.select("features", "closest").withColumnRenamed("closest", "cluster")
    return centroids, final_df

In [64]:
# final_centroids, clustered_df_1 = kmeans(
#     df=X_df_1,
#     k=3,
#     sample_fraction=0.1,
#     threshold=0.01,
#     seed=42
# )

In [65]:
final_centroids, clustered_df_2partitions = kmeans(
    df=X_df_2,
    k=3,
    sample_fraction=0.1,
    threshold=0.01,
    seed=42
)


Current centroids:
 [[-23.23792024  13.48738579 -25.18877297   6.26173255 -17.34518817
   -4.53498915 -17.10049248  15.37463003  -3.84556677  -8.5117667
    5.13854732  -7.22001986   0.61579329  -7.3272216   -0.03863207
   -6.33151509 -12.68885763  -4.84738161   1.02053588   1.63078748
    1.76970813  -1.69197331  -1.04567292   0.1433865    1.61157707
   -0.22157564   1.48123261   0.4381248 ]
 [-16.91746827   9.66990017 -23.73644341  11.82499023  -9.83054823
   -2.51482887 -17.29065667   1.82040787  -6.26490349 -12.91663611
    9.5671103  -13.71706738   0.89954071 -13.27296506  -0.40226004
   -7.75409406 -11.64460321  -4.74130271   0.58462597   0.9967451
   -2.33611096   0.97275473   1.24186581  -1.05108625   0.03800908
    0.67231707   2.10847085  -1.42124325]
 [-16.52650657   8.5849718  -18.64985319   9.50559352 -13.79381853
   -2.8324043  -16.7016943    7.5173439   -8.50705864 -14.11018444
    5.29923635 -10.83400648   1.67112025  -9.37385858   0.36080564
   -9.89924654 -19.2362923

In [66]:
clustered_df_2partitions.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|features                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               

KNN

In [67]:
from pyspark.ml.linalg import Vectors
from sklearn.neighbors import NearestNeighbors
import builtins

# count_synthetic_data
# minority_class_with_cluster
clustered_class1_df = clustered_df_2partitions.withColumn("class", F.lit(1))
cluster_counts = clustered_df_2partitions.groupBy("cluster").count().rdd.collectAsMap()

total_minority = builtins.sum(cluster_counts.values())
max_majority   = train_df.filter(F.col("class") == 0).count()
print("total_minority =", total_minority)
print("max_majority =", max_majority)

total_minority = 350
max_majority = 198901


In [68]:
# compute target
target_counts = {
    c: int(builtins.round(cnt / total_minority * max_majority))
    for c, cnt in cluster_counts.items()
}
target_counts

{1: 132411, 2: 39780, 0: 26710}

In [69]:
need_counts = {
    c: max(target_counts[c] - cluster_counts[c], 0)
    for c in cluster_counts
}
need_counts

{1: 132178, 2: 39710, 0: 26663}

In [70]:
import random

# create synthetic data
def create_synthetic_data(data, list_neighbors, num_samples):

    synthetic_data = []

    for _ in range(num_samples):
      index = np.random.randint(0, data.shape[0])
      data_point = data[index]

      neighbor_index = random.choice([i for i in list_neighbors[index] if i != index])
      neighbor = data[neighbor_index]

      random_float = random.random()
      new_synthetic_data_point =  data_point + random_float * (neighbor -  data_point)
      synthetic_data.append(new_synthetic_data_point)

    return np.array(synthetic_data)

In [71]:
from sklearn.neighbors import NearestNeighbors
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# generate synthetic
synthetic_data_df = []
for c, need in need_counts.items():
    if need <= 0:
        continue
    arr = np.array(
        clustered_df_2partitions
        .filter(F.col("cluster") == c)
        .select("features")
        .rdd
        .map(lambda row: row.features.toArray())
        .collect()
    )

    knn = NearestNeighbors(n_neighbors=5)
    knn.fit(arr)
    _, indices = knn.kneighbors(arr)

    synth = create_synthetic_data(arr, indices, need)
    sdf = spark.createDataFrame(
        [(Vectors.dense(v), int(c), 1) for v in synth],
        ["features", "cluster", "class"]
    )
    synthetic_data_df.append(sdf)
    print(sdf.show(5))

+--------------------+-------+-----+
|            features|cluster|class|
+--------------------+-------+-----+
|[-0.8960348663346...|      1|    1|
|[-5.7056059364894...|      1|    1|
|[1.35894502952261...|      1|    1|
|[0.02706275834135...|      1|    1|
|[-3.1754138171678...|      1|    1|
+--------------------+-------+-----+
only showing top 5 rows

None
+--------------------+-------+-----+
|            features|cluster|class|
+--------------------+-------+-----+
|[-4.2286850048313...|      2|    1|
|[-4.6006999442901...|      2|    1|
|[-5.2722095714337...|      2|    1|
|[-11.977417456294...|      2|    1|
|[-13.040401699504...|      2|    1|
+--------------------+-------+-----+
only showing top 5 rows

None
+--------------------+-------+-----+
|            features|cluster|class|
+--------------------+-------+-----+
|[-12.867002125013...|      0|    1|
|[-17.881249407430...|      0|    1|
|[-17.226640691476...|      0|    1|
|[-17.361145588687...|      0|    1|
|[-12.399576867

In [72]:
# 4) Union all synthetic
if synthetic_data_df:
    all_synthetic = synthetic_data_df[0]
    for sdf in synthetic_data_df[1:]:
        all_synthetic = all_synthetic.union(sdf)
else:
    all_synthetic = spark.createDataFrame([], clustered_class1_df.schema)

In [73]:
# majority vector
majority_vecs = assembler \
    .transform(train_df.filter(F.col("class") == 0)) \
    .select("features") \
    .withColumn("class", F.lit(0))
minority_vecs  = clustered_class1_df.select("features", "class")
synthetic_vecs = all_synthetic.select("features", "class")

# union train data
balanced_train_df = majority_vecs \
    .union(minority_vecs) \
    .union(synthetic_vecs)

balanced_train_df.show(5)
balanced_train_df.groupBy("class").count().show()


+--------------------+-----+
|            features|class|
+--------------------+-----+
|[-56.407509631329...|    0|
|[-36.802319908874...|    0|
|[-36.510583170797...|    0|
|[-34.591213470459...|    0|
|[-34.148233651352...|    0|
+--------------------+-----+
only showing top 5 rows

+-----+------+
|class| count|
+-----+------+
|    0|198901|
|    1|198901|
+-----+------+



In [74]:
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Oversampling runtime: {elapsed_time:.2f} seconds")

Oversampling runtime: 38.97 seconds


In [75]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
test_df = assembler \
    .transform(test_df)\
    .select("features", "Class")
test_df = test_df.withColumnRenamed("Class", "class")

In [76]:
print('balanced_train_df_partition',balanced_train_df.rdd.getNumPartitions())
print('test_df_partition', test_df.rdd.getNumPartitions())

balanced_train_df_partition 10
test_df_partition 2


Change Number of Partitions

In [77]:
# train_rdd = balanced_train_df.rdd.repartition(3).cache()
# test_rdd = test_df.rdd.repartition(2).cache()

# print('training - #partitions:',train_rdd.getNumPartitions())
# print('testing - #partitions:', test_rdd.getNumPartitions())

Decision tree

In [78]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_df = balanced_train_df
test_df = test_df

dt = DecisionTreeClassifier(labelCol="class", featuresCol="features", maxDepth=5)
model = dt.fit(train_df)
predictions = model.transform(test_df)
evaluator = MulticlassClassificationEvaluator(
    labelCol="class",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy * 100:.2f}%")
predictions.select("features", "class", "prediction").show(5)


Accuracy: 93.81%
+--------------------+-----+----------+
|            features|class|prediction|
+--------------------+-----+----------+
|[-33.404081610779...|    0|       1.0|
|[-33.017174430628...|    0|       1.0|
|[-32.962809811697...|    0|       1.0|
|[-32.273469750819...|    0|       1.0|
|[-30.552380043581...|    1|       1.0|
+--------------------+-----+----------+
only showing top 5 rows



F1

In [79]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="class",
    predictionCol="prediction",
    metricName="f1"
)

f1_score = evaluator.evaluate(predictions)
print(f"F1 Score: {f1_score:.2f}")


F1 Score: 0.97
