### Environment Setup

In [1]:
import os
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql.window import Window

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [3]:
spark_session = \
  SparkSession.builder\
            .appName("real-time-analytics")\
            .config("spark.sql.caseSensitive", "true")\
            .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
            .getOrCreate()
spark_session.sparkContext.setLogLevel("ERROR")

25/02/05 03:24:19 WARN Utils: Your hostname, osbdet resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
25/02/05 03:24:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c307520c-0498-4e9f-a310-c7a0a383481c;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-log

In [4]:
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "s3access")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "_s3access123$")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localhost:9000")

In [5]:
kafka_topic = "transactions_v1"
trigger_processingTime = "5 seconds"
schema_csv = """
        timestamp STRING,
        cc_num STRING,
        merchant STRING,
        category STRING,
        amt DOUBLE,
        first STRING,
        last STRING,
        gender STRING,
        street STRING,
        city STRING,
        state STRING,
        zip INT,
        lat DOUBLE,
        long DOUBLE,
        city_pop INT,
        job STRING,
        dob STRING,
        trans_num STRING,
        unix_time STRING,
        merch_lat DOUBLE,
        merch_long DOUBLE,
        is_fraud STRING
    """

### Streaming

In [6]:
raw_df = \
  spark_session.readStream\
               .format("kafka")\
               .option("kafka.bootstrap.servers","localhost:9092")\
               .option("subscribe", kafka_topic)\
               .option("startingOffsets", "earliest")\
               .load()

In [7]:
raw_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
messages_df = raw_df.selectExpr("CAST(value AS STRING) AS transaction") \
        .select(F.from_csv(F.col("transaction"), schema_csv, {"sep": ";"}).alias("data")) \
        .select("data.*")

In [10]:
messages_df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- cc_num: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: string (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: string (nullable = true)



In [14]:
sink = messages_df \
    .writeStream \
    .outputMode("append") \
    .trigger(processingTime=trigger_processingTime) \
    .format("console") \
    .option("truncate", "true") \
    .start()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------


                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----------------+-----------+--------------------+--------------+------+-----------+--------+------+--------------------+----------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+---------+-----------+--------+
|       timestamp|     cc_num|            merchant|      category|   amt|      first|    last|gender|              street|            city|state|  zip|    lat|     long|city_pop|                 job|       dob|           trans_num| unix_time|merch_lat| merch_long|is_fraud|
+----------------+-----------+--------------------+--------------+------+-----------+--------+------+--------------------+----------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+---------+-----------+--------+
|21/06/2020 12:14|3.57303E+15|fraud_Sporer-Keebler| personal_care| 29.84|     Joa

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
-------------------------------------------
Batch: 1
-------------------------------------------
+----------------+-----------+--------------------+-------------+-----+---------+--------+------+--------------------+----------------+-----+-----+-------+--------+--------+--------------------+----------+--------------------+----------+---------+----------+--------+
|       timestamp|     cc_num|            merchant|     category|  amt|    first|    last|gender|              street|            city|state|  zip|    lat|    long|city_pop|                 job|       dob|           trans_num| unix_time|merch_lat|merch_long|is_fraud|
+----------------+-----------+--------------------+-------------+-----+---------+--------+------+--------------------+----------------+-----+-----+-------+--------+--------+--------------------+----------+--------------------+----------+---------+----------+--------+
|2

In [16]:
sink.stop()

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+----------------+-----------+--------------------+--------------+------+---------+----------+------+--------------------+-------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+---------+-----------+--------+
|       timestamp|     cc_num|            merchant|      category|   amt|    first|      last|gender|              street|         city|state|  zip|    lat|     long|city_pop|                 job|       dob|           trans_num| unix_time|merch_lat| merch_long|is_fraud|
+----------------+-----------+--------------------+--------------+------+---------+----------+------+--------------------+-------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+---------+-----------+--------+
|21/06/2020 18:27|3.53113E+15|  fraud_Schulist Ltd|   food_dining| 74.91|   Shelby|  Mitch