In [2]:
!whoami

irfanqs


In [3]:
!pwd

/home/irfanqs


In [4]:
!pip3 install pyspark pandas

Defaulting to user installation because normal site-packages is not writeable


In [5]:
!git clone https://github.com/Gallant4114/project-kafka.git

fatal: destination path 'project-kafka' already exists and is not an empty directory.


#Training

In [48]:
!cd project-kafka

In [17]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import os

spark = SparkSession.builder.appName("FraudCreditCard").getOrCreate()

BATCH_DIR = "/home/irfanqs/project-kafka/batches"
MODEL_DIR = "/home/irfanqs/project-kafka/models"
os.makedirs(MODEL_DIR, exist_ok=True)

In [7]:
sc = spark.sparkContext
print(sc)

<SparkContext master=local[*] appName=FraudCreditCard>


In [18]:
FEATURE_COLUMNS = ["amt", "lat", "long", "city_pop", "unix_time", "merch_lat", "merch_long"]

for i in range(3):
    batch_path = os.path.join(BATCH_DIR, f"batch_{i}.json")
    print(f"Reading {batch_path}")

    df = spark.read.option("multiline", "true").json(batch_path)

    for col in FEATURE_COLUMNS:
        df = df.withColumn(col, df[col].cast("double"))
    df = df.withColumn("label", df["is_fraud"].cast("int"))

    (trainingData, testData) = df.randomSplit([0.8, 0.2], seed=42)

    assembler = VectorAssembler(inputCols=FEATURE_COLUMNS, outputCol="features")
    lr = LogisticRegression(featuresCol="features", labelCol="label")

    pipeline = Pipeline(stages=[assembler, lr])
    model = pipeline.fit(trainingData) # Latih model dengan data latih

    model_path = os.path.join(MODEL_DIR, f"fraud_model_batch_{i}")
    model.write().overwrite().save(model_path)
    print(f"Model from batch {i} saved to {model_path}")

    # Evaluasi Model
    print(f"\n--- Evaluating Model from batch {i} ---")

    predictions = model.transform(testData)
    correct_predictions = predictions.filter(predictions.label == predictions.prediction).count()
    total_predictions = predictions.count()

    if total_predictions > 0:
        accuracy = correct_predictions / total_predictions
        print(f"Accuracy on test data for batch {i}: {accuracy:.4f}")
    else:
        print(f"No predictions to evaluate for batch {i}.")

    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    auc = evaluator.evaluate(predictions)
    print(f"Area Under ROC (AUC) on test data for batch {i}: {auc:.4f}")

print("\nAll models processed.")
spark.stop()

Reading /home/irfanqs/project-kafka/batches/batch_0.json
Model from batch 0 saved to /home/irfanqs/project-kafka/models/fraud_model_batch_0

--- Evaluating Model from batch 0 ---
Accuracy on test data for batch 0: 0.9981
Area Under ROC (AUC) on test data for batch 0: 0.9981
Reading /home/irfanqs/project-kafka/batches/batch_1.json
Model from batch 1 saved to /home/irfanqs/project-kafka/models/fraud_model_batch_1

--- Evaluating Model from batch 1 ---
Accuracy on test data for batch 1: 0.9905
Area Under ROC (AUC) on test data for batch 1: 0.9302
Reading /home/irfanqs/project-kafka/batches/batch_2.json
Model from batch 2 saved to /home/irfanqs/project-kafka/models/fraud_model_batch_2

--- Evaluating Model from batch 2 ---
Accuracy on test data for batch 2: 0.9924
Area Under ROC (AUC) on test data for batch 2: 0.8774

All models processed.


Dari hasil yang didapat, bisa disimpulkan bahwa model dari batch 0 memiliki akurasi yang paling baik dari model dari batch 1 dan 2

# Evaluasi

In [5]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
from pyspark.sql.functions import col, udf
from pyspark.ml.linalg import Vector
import os

spark = SparkSession.builder.appName("FraudPredictionLive") \
    .getOrCreate()

MODEL_DIR = "/home/irfanqs/project-kafka/models"
MODEL_TO_USE = "/home/irfanqs/project-kafka/models/fraud_model_batch_0"
model_path = os.path.join(MODEL_DIR, MODEL_TO_USE)

# Load model
print(f"Loading model from: {model_path}")
try:
    loaded_model = PipelineModel.load(model_path)
    print("Model loaded successfully!")
except Exception as e:
    print(f"Error loading model: {e}")
    spark.stop()
    exit()

Loading model from: /home/irfanqs/project-kafka/models/fraud_model_batch_0


[Stage 0:>                                                          (0 + 1) / 1]                                                                                

Model loaded successfully!


In [6]:
new_data_raw = [
    (100.50, 40.71, -74.00, 8000000, 1678886400, 40.72, -74.01),
    (1500.00, 34.05, -118.25, 4000000, 1678887000, 34.06, -118.24),
    (50.00, 41.88, -87.63, 2700000, 1678887600, 41.87, -87.62)
]

schema = StructType([
    StructField("amt", DoubleType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("unix_time", IntegerType(), True),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True)
])

new_df = spark.createDataFrame(new_data_raw, schema)

print("\nNew data for prediction:")
new_df.show()
new_df.printSchema()

predictions = loaded_model.transform(new_df)

print("\nPredictions:")
predictions.select("amt", "lat", "long", "probability", "prediction").show(truncate=False)
print("1 = fraud, 0 = not fraud")

spark.stop()


New data for prediction:
+------+-----+-------+--------+----------+---------+----------+
|   amt|  lat|   long|city_pop| unix_time|merch_lat|merch_long|
+------+-----+-------+--------+----------+---------+----------+
| 100.5|40.71|  -74.0| 8000000|1678886400|    40.72|    -74.01|
|1500.0|34.05|-118.25| 4000000|1678887000|    34.06|   -118.24|
|  50.0|41.88| -87.63| 2700000|1678887600|    41.87|    -87.62|
+------+-----+-------+--------+----------+---------+----------+

root
 |-- amt: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)


Predictions:
+------+-----+-------+-----------+----------+
|amt   |lat  |long   |probability|prediction|
+------+-----+-------+-----------+----------+
|100.5 |40.71|-74.0  |[0.0,1.0]  |1.0       |
|1500.0|34.05|-118.25|[0.0,1.0]  |1.0       |
|50