# Prototyping
production-ready version saved in corresponding python script

## Environment Config

##### Libraries/Packages

In [1]:
import os

from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.feature import VectorAssembler, StringIndexer, StringIndexerModel, OneHotEncoder

from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType
from pyspark.sql.window import Window

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.hadoop:hadoop-aws:3.3.1 pyspark-shell'

##### Spark Session Config

In [3]:
spark_session = \
  SparkSession.builder\
            .appName("real-time-analytics")\
            .config("spark.sql.caseSensitive", "true")\
            .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
            .getOrCreate()
spark_session.sparkContext.setLogLevel("ERROR")
print(f"This cluster relies on Spark '{spark_session.version}'")

25/02/10 05:40:39 WARN Utils: Your hostname, osbdet resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
25/02/10 05:40:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/osbdet/.jupyter_venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-edfa3e3b-c645-47e1-94bb-338d288bc1d0;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in 

This cluster relies on Spark '3.5.0'


In [4]:
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "s3access")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "_s3access123$")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark_session.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localhost:9000")

## Application Config

In [5]:
kafka_topic_input = "transactions_raw_v1" # topic name for input transaction events
kafka_topic_output = "transactions_classified_v1" # topic name input transaction classification events
kafka_bootstrap_server_address = "localhost:9092" # server address for kafka connection
kafka_streaming_checkpoint_filepath = "/home/osbdet/notebooks/real-time-analytics/streaming_checkpoint"


s3_files_timestamp_col = "trans_date_trans_time" # timestamp col name of raw events landed in object storage
num_historical_transactions = 3 # transaction history threshold for processing
string_indexer_filepath = "/home/osbdet/notebooks/real-time-analytics/classifier/string_indexer"
rf_classifier_filepath = "/home/osbdet/notebooks/real-time-analytics/classifier/model" # filepath where classifier model is stored

##### Schema Config

The below configurations are needed given lack of a schema registry

* schema_ddl is Python string literal with schema in DDL format
* schema_ddl must match avro schema defined when serializing .csv records

avro schema (see ingestion_engine.xml for further details):

`{`<br>
`  "name": "Transaction",`<br>
`  "namespace": "com.example",`<br>
`  "type": "record",`<br>
`  "fields": [`<br>
`      {"name": "cc_num", "type": "string"},`<br>
`      {"name": "trans_num", "type": "string"},`<br>
`      {"name": "trans_date_trans_time", "type": "string"},`<br>
`      {"name": "merchant", "type": "string"},`<br>
`      {"name": "category", "type": "string"},`<br>
`      {"name": "amt", "type": "double"},`<br>
`      {"name": "first", "type": "string"},`<br>
`      {"name": "last", "type": "string"},`<br>
`      {"name": "gender", "type": "string"},`<br>
`      {"name": "street", "type": "string"},`<br>
`      {"name": "city", "type": "string"},`<br>
`      {"name": "state", "type": "string"},`<br>
`      {"name": "zip", "type": "int"},`<br>
`      {"name": "lat", "type": "double"},`<br>
`      {"name": "long", "type": "double"},`<br>
`      {"name": "city_pop", "type": "int"},`<br>
`      {"name": "job", "type": "string"},`<br>
`      {"name": "dob", "type": "string"},`<br>
`      {"name": "unix_time", "type": "long"},`<br>
`      {"name": "merch_lat", "type": "double"},`<br>
`      {"name": "merch_long", "type": "double"}`<br>
`  ]` <br>
`}`

In [6]:
schema = """
        cc_num BIGINT,
        trans_num STRING,
        trans_date_trans_time STRING,
        merchant STRING,
        category STRING,
        amt DOUBLE,
        first STRING,
        last STRING,
        gender STRING,
        street STRING,
        city STRING,
        state STRING,
        zip INT,
        lat DOUBLE,
        long DOUBLE,
        city_pop INT,
        job STRING,
        dob STRING,
        unix_time BIGINT,
        merch_lat DOUBLE,
        merch_long DOUBLE
    """

## Stream Processing

### Business Logic

In [7]:
def get_transaction_history(cc_num, num_historical_transactions):
    path = f"s3a://raw-transactions/{cc_num}"
    relevant_columns = ["cc_num", "trans_date_trans_time", "amt", "lat", "long", "merch_lat", "merch_long", "category"]

    try:
        # Read the Parquet files and add file_name column
        data = spark_session.read.parquet(path).withColumn("file_name", f.input_file_name())

        # Count unique files
        file_count = data.select("file_name").distinct().count()

        # Ensure enough files exist
        if file_count <= num_historical_transactions:
            print("not enough history")
            return None

        # Select the top transactions (assuming a timestamp column exists)
        if "transaction_time" in data.columns:
            data = data.orderBy("transaction_time", ascending=False).limit(num_historical_transactions)
        else:
            data = data.limit(num_historical_transactions)

        return data\
                .withColumn("trans_date_trans_time", f.to_timestamp("trans_date_trans_time", "dd/MM/yyyy HH:mm"))\
                .select(relevant_columns)
                

    except Exception as e:
        print(f"Error reading from {path}: {e}")
        return None

In [8]:
def clean_transaction(df_transaction):
    df_transaction_clean = df_transaction\
            .withColumn("trans_date_trans_time", f.to_timestamp("trans_date_trans_time", "dd/MM/yyyy HH:mm"))\
            .withColumn("dob", f.to_date(f.col("dob"),"dd/MM/yyyy"))\
            .withColumn("hour", f.hour("trans_date_trans_time"))\
            .withColumn("day", f.dayofweek("trans_date_trans_time"))\
            .withColumn("month", f.month("trans_date_trans_time"))\
            .withColumn("year", f.year("trans_date_trans_time"))\
            .withColumn("weekend", f.when(f.col("day").isin(1, 7), 1).otherwise(0))\
            .withColumn("customer_age", f.round(f.datediff(f.col("trans_date_trans_time"), f.col("dob"))/365,0).cast('Integer'))\
            .withColumn("distance", f.sqrt(f.pow(f.col("lat") - f.col("merch_lat"), 2) + f.pow(f.col("long") - f.col("merch_long"), 2)))\
            .withColumn("gender", f.when(f.col("gender") == "Male", 1).otherwise(0))\
            .drop(*["merchant", "first", "last", "job", "dob", "street", "city", "state", "zip", "city_pop"])
    
    return df_transaction_clean

In [9]:
def enrich_transaction(df_transaction, df_transaction_history):
    window = Window.partitionBy("cc_num").orderBy(f.col("trans_date_trans_time"))

    # Amount Statistics
    df_amt_stats = df_transaction_history\
            .agg(
                f.mean(f.col("amt")).alias("historical_mean_amt"),
                f.stddev(f.col("amt")).alias("historical_std_amt"),
                f.max(f.col("amt")).alias("historical_max_amt"),
                f.min(f.col("amt")).alias("historical_min_amt")
            )

    # Average Time Diff b/w Transactions
    df_time_diff = df_transaction_history.withColumn("prev_trans_time", f.lag("trans_date_trans_time").over(window))
    df_avg_time_diff = df_time_diff.agg(f.mean(f.col("prev_trans_time")).alias("historical_avg_time_diff"))
    
    # Distance Statistics
    df_distance = df_transaction_history.withColumn(
        "distance", f.sqrt((f.col("lat") - f.col("merch_lat"))**2 + (f.col("long") - f.col("merch_long"))**2)
    )
    df_avg_distance = df_distance\
                        .agg(
                            f.mean(f.col("distance")).alias("historical_avg_distance_from_merchant"),
                            f.stddev(f.col("distance")).alias("historical_std_distance_from_merchant")
                        )
    
    # Number of distinct categories
    df_num_categories = df_transaction_history.agg(f.countDistinct("category").cast("int").alias("historical_num_categories"))

    # Join Historical Statistics
    df_transaction_enriched = df_transaction.crossJoin(df_amt_stats).crossJoin(df_avg_time_diff).crossJoin(df_avg_distance).crossJoin(df_num_categories)

    return df_transaction_enriched

In [10]:
def encode_transaction(df_transaction):
    # Load the indexer model - inputCol="category", outputCol="category_index"
    indexer = StringIndexerModel.load(string_indexer_filepath)
    
    # Convert categorical column to numeric index using StringIndexer
    df_indexed = indexer.transform(df_transaction)

    # Encode categorical column
    encoder = OneHotEncoder(inputCol="category_index", outputCol="category_encoded")
    df_encoded = encoder.fit(df_indexed).transform(df_indexed)

    # Drop non-numeric columns
    # df_encoded = df_encoded.drop(*["category", "category_index", "trans_date_trans_time"])
    df_encoded = df_encoded.drop(*["category_index"])

    return df_encoded
    

In [11]:
def vectorize_transaction(df_transaction):
    # Define Vector Assembler
    # feature_columns = [col for col in df_transaction.columns if col != "trans_num"]
    feature_columns = [col for col in df_transaction.columns if col not in {"trans_num", "trans_date_trans_time", "category"}]
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

    # Vectorize features
    df_vectorized = assembler.transform(df_transaction)

    return df_vectorized
    

In [23]:
def construct_classification_event(df_transaction_classified):
    df_classification_event = df_transaction_classified.drop("category_encoded", "features", "rawPrediction", "probability")
    df_classification_event_json = df_transaction_classified.selectExpr("trans_num AS key", "to_json(struct(*)) AS value")

    return df_classification_event_json

In [18]:
def process_transaction(df_transaction, batch_id):
    if df_transaction.isEmpty():
        print(f"Skipping batch {batch_id} as it contains no data")
        return

    print("raw transaction:\n")
    df_transaction.printSchema()
    
    cc_num = df_transaction.select("cc_num").first()[0]
    trans_num = df_transaction.select("trans_num").first()[0]

    print(f"Processing trans_num={trans_num} for cc_num={cc_num}")

    # retrieve transaction history
    df_transaction_history = get_transaction_history(cc_num, num_historical_transactions)
    if df_transaction_history is None:
        print(f"skipping over trans_num={trans_num} for cc_num={cc_num} due to lack of history")
        return

    # clean raw transaction data
    df_transaction_clean = clean_transaction(df_transaction)
    print("clean transaction:\n")
    df_transaction_clean.printSchema()

    # enrich with historical statistics
    df_transaction_enriched = enrich_transaction(df_transaction_clean, df_transaction_history)
    print("enriched transaction:\n")
    df_transaction_enriched.printSchema()

    # encode transaction
    df_transaction_encoded = encode_transaction(df_transaction_enriched)
    print("encoded transaction:\n")
    df_transaction_encoded.printSchema()

    # vectorize transaction
    df_transaction_vectorized = vectorize_transaction(df_transaction_encoded)
    print("vectorized transaction:\n")
    df_transaction_vectorized.printSchema()

    # load classifier
    loaded_rf_model = RandomForestClassificationModel.load(rf_classifier_filepath)

    # classify transaction
    df_transaction_classified = loaded_rf_model.transform(df_transaction_vectorized)
    print("Number of features in trained model: ", loaded_rf_model.numFeatures)
    print("Number of features in vectorized data: ", df_transaction_vectorized.select("features").first()[0].size)

    print("classified transaction:\n")
    df_transaction_classified.printSchema()

    # construct classification event
    df_transaction_classification_event = construct_classification_event(df_transaction_classified)
    

    # Write JSON to Kafka in append mode
    df_transaction_classification_event.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_server_address) \
        .option("topic", kafka_topic_output) \
        .option("checkpointLocation", "/tmp/kafka_checkpoint") \
        .save()
    
    return

### Data Source

In [19]:
df_transaction_raw = \
  spark_session.readStream\
               .format("kafka")\
               .option("kafka.bootstrap.servers", kafka_bootstrap_server_address)\
               .option("subscribe", kafka_topic_input)\
               .option("startingOffsets", "latest")\
               .load()

df_transaction_raw.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [20]:
df_transaction_deserialized = df_transaction_raw.selectExpr("CAST(value AS STRING) AS transaction") \
        .select(f.from_csv(f.col("transaction"), schema).alias("data")) \
        .select("data.*")

df_transaction_deserialized.printSchema()

root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- unix_time: long (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)



### Data Sink

In [21]:
sink = df_transaction_deserialized \
    .writeStream \
    .foreachBatch(process_transaction) \
    .option("checkpointLocation", "/home/osbdet/notebooks/real-time-analytics/streaming_checkpoint") \
    .start()

                                                                                

raw transaction:

root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- unix_time: long (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)



                                                                                

Processing trans_num=61dca41a9728ea5fd6db99efd59768f8 for cc_num=6010000000000000


                                                                                

clean transaction:

root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: integer (nullable = false)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- unix_time: long (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekend: integer (nullable = false)
 |-- customer_age: integer (nullable = true)
 |-- distance: double (nullable = true)

enriched transaction:

root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: integer (n

                                                                                

encoded transaction:

root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: integer (nullable = false)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- unix_time: long (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekend: integer (nullable = false)
 |-- customer_age: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- historical_mean_amt: double (nullable = true)
 |-- historical_std_amt: double (nullable = true)
 |-- historical_max_amt: double (nullable = true)
 |-- historical_min_amt: double (nullable = true)
 |-- historical_avg_time_diff: double (nullable = true)
 

                                                                                

Number of features in trained model:  36


                                                                                

Number of features in vectorized data:  36
classified transaction:

root
 |-- cc_num: long (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: integer (nullable = false)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- unix_time: long (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekend: integer (nullable = false)
 |-- customer_age: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- historical_mean_amt: double (nullable = true)
 |-- historical_std_amt: double (nullable = true)
 |-- historical_max_amt: double (nullable = true)
 |-- historical_min_amt: double (nullable = true)
 |-- histor

                                                                                

In [22]:
sink.stop()

## Test Data

In [18]:
# Define the schema
schema = StructType([
    StructField("cc_num", LongType(), True),
    StructField("trans_num", StringType(), True),
    StructField("trans_date_trans_time", StringType(), True),
    StructField("merchant", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amt", DoubleType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", IntegerType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("unix_time", LongType(), True),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True)
])

# Sample data
data_sample = [
    (6010000000000000, "txn0001", "07/02/2020 10:00:00", "merchant1", "category1", 100.0, "Alice", "Smith", "F", "123 Main St", "Austin", "TX", 78701, 30.2672, -97.7431, 1000000, "Engineer", "01/01/1990", 1615158000, 30.2672, -97.7431)
]

df_sample = spark_session.createDataFrame(data_sample, schema=schema)