# Building Models to Predict Prospective Customers for MonPG Part 2 - Spark Streaming


Date: 12/10/2022

Version: 1.0

Environment: Python 3.10.5 and Anaconda 4.11.0 (64-bit)

#### Libraries used:

* Spark: A fast engine for structured streaming. Combined with Python as Pyspark
* Json to use a module called dumps for sending the streamed data to the consumer.
* Kafka3 to produce a new topic for state-wise predictions.

## 2. Streaming Application using Spark Structured Streaming

In this notebook, we are going to ingest the incoming data from the two different topics `customer` and `bureau`. After ingesting it, we must bring it to an acceptable format before running it through the given ML pipeline model.

**NOTE**: Please ensure that the producer code `TopUpCustomerPredictionPart2_Step1Producer.ipynb` is running before executing any writestream query here.

### 2.1 Creating the Spark Session

Let us start by creating a Spark session with the kafka-spark integration packages. We can specify the time zone for the Spark configuration as `UTC`. Finally, since we are dealing with a large amount of streaming data, it would be a nice idea to set the `maxtoStringFields` parameter just to make sure that a large string representation of the execution plan is published properly.

Additionally, we can also set the memory Heap size to ensure that large streamed outputs are displayed without interruption.
**Note2**: The setting of the max heap size has been done as a precautionary measure on systems with a good memory. A long-running streaming query may also fail on the Monash VM server.

In [1]:
# Importing the OS library and spark structured streaming packages
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

In [2]:
# Import SparkConf class into program
from kafka3 import KafkaProducer
from json import dumps
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[2]"

# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Top-up Prediction Streaming Service"

# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name).set("spark.sql.session.timeZone", "UTC")

spark = SparkSession.builder.config(conf=spark_conf).config('spark.sql.debug.maxToStringFields', 5000).getOrCreate()

Ivy Default Cache set to: /srv/home/sgup0021/.ivy2/cache
The jars for the packages stored in: /srv/home/sgup0021/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.8/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-56bb660f-ebac-4fd9-a0f7-e68f602045dc;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org

In [3]:
# Beautifying the Jupyter notebook code by hiding warnings.
import warnings
warnings.filterwarnings('ignore')

In [4]:
# Hiding Spark warnings
# Note: If faced with issues in Kafka-Spark streaming, please set log level to WARN or INFO for execution details
spark.sparkContext.setLogLevel("OFF")

### 2.2 Ingesting the Streaming Data

The incoming data for both streams will be ingested with the help of the `spark.readStream` function. Each stream is identified by the topic name. Additional parameters include the kafka server from which the data should be ingested. Finally setting the `startingOffsets` parameter to earliest ensures that a new query picks up from where the last one left off.

In [5]:
# Ingesting the data from the customer topic
topic_customer = "customer_data"
customer_ingest = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic_customer) \
    .option("startingOffsets", 'earliest')\
    .load()

# Printing the schema of the ingested customer topic data
customer_ingest.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# Ingesting the data from the bureau topic
topic_bureau = "bureau_data"
bureau_ingest = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic_bureau) \
    .option("startingOffsets", 'earliest')\
    .load()

# Printing the schema of the ingested bureau topic data
bureau_ingest.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### 2.3 Transforming the Streamed Data

With the data ingestion setup above, we can now begin the process of transforming the streamed value into something that resembles a Spark dataframe.

#### 2.3.1 Defining the Schemas

 First, we must now prepare the schema structures for the two streams. We can specify the schema for bureau and customer by using the `ArrayType(StructType())` function to map it properly to the ingested data. We need to create two different structs for the customer data and bureau data respectively.

In [7]:
# Defining the schema for the bureau data
schema_bureau = ArrayType(StructType([

    StructField('ID', StringType(), True),
    StructField('SELF-INDICATOR', StringType(), True),
    StructField('MATCH-TYPE', StringType(), True),
    StructField('ACCT-TYPE', StringType(), True),
    StructField('CONTRIBUTOR-TYPE', StringType(), True),
    StructField('DATE-REPORTED', StringType(), True),
    StructField('OWNERSHIP-IND', StringType(), True),
    StructField('ACCOUNT-STATUS', StringType(), True),
    StructField('DISBURSED-DT', StringType(), True),
    StructField('CLOSE-DT', StringType(), True),
    StructField('LAST-PAYMENT-DATE', StringType(), True),
    StructField('CREDIT-LIMIT/SANC AMT', StringType(), True),
    StructField('DISBURSED-AMT/HIGH CREDIT', StringType(), True),
    StructField('INSTALLMENT-AMT', StringType(), True),
    StructField('CURRENT-BAL', StringType(), True),
    StructField('INSTALLMENT-FREQUENCY', StringType(), True),
    StructField('OVERDUE-AMT', StringType(), True),
    StructField('WRITE-OFF-AMT', StringType(), True),
    StructField('ASSET_CLASS', StringType(), True),
    StructField('REPORTED DATE - HIST', StringType(), True),
    StructField('DPD - HIST', StringType(), True),
    StructField('CUR BAL - HIST', StringType(), True),
    StructField('AMT OVERDUE - HIST', StringType(), True),
    StructField('AMT PAID - HIST', StringType(), True),
    StructField('TENURE', StringType(), True),
    StructField('ts', TimestampType(), True)
]))

In [8]:
# Defining the schema for the customer data
schema_customer = ArrayType(StructType([

    StructField('ID', StringType(), True),
    StructField('Frequency', StringType(), True),
    StructField('InstlmentMode', StringType(), True),
    StructField('LoanStatus', StringType(), True),
    StructField('PaymentMode', StringType(), True),
    StructField('BranchID', StringType(), True),
    StructField('Area', StringType(), True),
    StructField('Tenure', StringType(), True),
    StructField('AssetCost', StringType(), True),
    StructField('AmountFinance', StringType(), True),
    StructField('DisbursalAmount', StringType(), True),
    StructField('EMI', StringType(), True),
    StructField('DisbursalDate', StringType(), True),
    StructField('MaturityDAte', StringType(), True),
    StructField('AuthDate', StringType(), True),
    StructField('AssetID', StringType(), True),
    StructField('ManufacturerID', StringType(), True),
    StructField('SupplierID', StringType(), True),
    StructField('LTV', StringType(), True),
    StructField('SEX', StringType(), True),
    StructField('AGE', StringType(), True),
    StructField('MonthlyIncome', StringType(), True),
    StructField('City', StringType(), True),
    StructField('State', StringType(), True),
    StructField('ZiPCODE', StringType(), True),
    StructField('Top-up Month', StringType(), True),
    StructField('ts', TimestampType(), True)
]))

#### 2.3.2 Parsing the Data

With the structs defined for both streams, we will use them to accept the ingested streams and give them structure. This will be accomplished in three steps:

* Ingest the value part of the batch message from json and store it in string representation.
* Explode the structure of the parsed value to expose the unnested data.
* Query the unnested struct and extract individual columns. We can also cast the appropriate datatype for some of these columns. For the columns that must be converted to integer datatype and contain non-numerical characters in them, we will handle them separately.

In [9]:
# Parsing the ingested customer data in json format
customer_json = customer_ingest.select(F.from_json(F.col("value").cast("string"), schema_customer).alias('parsed_value'))

# Checking the schema of the parsed value
customer_json.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- Frequency: string (nullable = true)
 |    |    |-- InstlmentMode: string (nullable = true)
 |    |    |-- LoanStatus: string (nullable = true)
 |    |    |-- PaymentMode: string (nullable = true)
 |    |    |-- BranchID: string (nullable = true)
 |    |    |-- Area: string (nullable = true)
 |    |    |-- Tenure: string (nullable = true)
 |    |    |-- AssetCost: string (nullable = true)
 |    |    |-- AmountFinance: string (nullable = true)
 |    |    |-- DisbursalAmount: string (nullable = true)
 |    |    |-- EMI: string (nullable = true)
 |    |    |-- DisbursalDate: string (nullable = true)
 |    |    |-- MaturityDAte: string (nullable = true)
 |    |    |-- AuthDate: string (nullable = true)
 |    |    |-- AssetID: string (nullable = true)
 |    |    |-- ManufacturerID: string (nullable = true)
 |    |    |-- SupplierID: string

In [10]:
# Unnesting the json extracted customer data 
customer_explode = customer_json.select(F.explode(F.col("parsed_value")).alias('unnested_value'))

# Checking the schema of the unnested customer data
customer_explode.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- Frequency: string (nullable = true)
 |    |-- InstlmentMode: string (nullable = true)
 |    |-- LoanStatus: string (nullable = true)
 |    |-- PaymentMode: string (nullable = true)
 |    |-- BranchID: string (nullable = true)
 |    |-- Area: string (nullable = true)
 |    |-- Tenure: string (nullable = true)
 |    |-- AssetCost: string (nullable = true)
 |    |-- AmountFinance: string (nullable = true)
 |    |-- DisbursalAmount: string (nullable = true)
 |    |-- EMI: string (nullable = true)
 |    |-- DisbursalDate: string (nullable = true)
 |    |-- MaturityDAte: string (nullable = true)
 |    |-- AuthDate: string (nullable = true)
 |    |-- AssetID: string (nullable = true)
 |    |-- ManufacturerID: string (nullable = true)
 |    |-- SupplierID: string (nullable = true)
 |    |-- LTV: string (nullable = true)
 |    |-- SEX: string (nullable = true)
 |    |-- AGE: string (nullable = tr

In [11]:
# Extracting the customer columns from the unnested struct
customer_df_formatted = customer_explode.select(
    F.col("unnested_value.ID").alias("ID"),
    F.col("unnested_value.Frequency").alias("Frequency"),
    F.col("unnested_value.InstlmentMode").alias("InstlmentMode"),
    F.col("unnested_value.LoanStatus").alias("LoanStatus"),
    F.col("unnested_value.PaymentMode").alias("PaymentMode"),
    F.col("unnested_value.BranchID").alias("BranchID"),
    F.col("unnested_value.Area").alias("Area"),
    F.col("unnested_value.Tenure").cast('int').alias("Tenure"),
    F.col("unnested_value.AssetCost").cast('int').alias("AssetCost"),
    F.col("unnested_value.AmountFinance").cast('double').alias("AmountFinance"),
    F.col("unnested_value.DisbursalAmount").cast('double').alias("DisbursalAmount"),
    F.col("unnested_value.EMI").cast('double').alias("EMI"),
    F.col("unnested_value.DisbursalDate").cast('timestamp').alias("DisbursalDate"),
    F.col("unnested_value.MaturityDate").cast('timestamp').alias("MaturityDate"),
    F.col("unnested_value.AuthDate").cast('timestamp').alias("AuthDate"),
    F.col("unnested_value.AssetID").alias("AssetID"),
    F.col("unnested_value.ManufacturerID").alias("ManufacturerID"),
    F.col("unnested_value.SupplierID").alias("SupplierID"),
    F.col("unnested_value.LTV").cast('double').alias("LTV"),
    F.col("unnested_value.Sex").alias("Sex"),
    F.col("unnested_value.Age").cast('int').alias("Age"),
    F.col("unnested_value.MonthlyIncome").cast('double').alias("MonthlyIncome"),
    F.col("unnested_value.City").alias("City"),
    F.col("unnested_value.State").alias("State"),
    F.col("unnested_value.ZipCode").alias("ZipCode"),
    F.col("unnested_value.Top-up Month").alias("Top-up Month"),
    F.col("unnested_value.ts").alias("ts")
)

# Checking the final formatted schema for the customer data
customer_df_formatted.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Frequency: string (nullable = true)
 |-- InstlmentMode: string (nullable = true)
 |-- LoanStatus: string (nullable = true)
 |-- PaymentMode: string (nullable = true)
 |-- BranchID: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- AssetCost: integer (nullable = true)
 |-- AmountFinance: double (nullable = true)
 |-- DisbursalAmount: double (nullable = true)
 |-- EMI: double (nullable = true)
 |-- DisbursalDate: timestamp (nullable = true)
 |-- MaturityDate: timestamp (nullable = true)
 |-- AuthDate: timestamp (nullable = true)
 |-- AssetID: string (nullable = true)
 |-- ManufacturerID: string (nullable = true)
 |-- SupplierID: string (nullable = true)
 |-- LTV: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- MonthlyIncome: double (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZipCode: str

Now that we are done extracting the customer data, let us confirm that we have received the data in the correct form. We want to make sure that we have the proper data to be used in the subsequent aggregations. We can use `writeStream` to print the streaming data to the console. To avoid memory issues with the VM, let us select a few columns only.

In [22]:
# Checking the formatted customer data
query = customer_df_formatted.select('ID', 'ts', 'Top-Up Month')\
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

[Stage 7:>                                                          (0 + 1) / 1]

-------------------------------------------
Batch: 0
-------------------------------------------


                                                                                

+---+-------------------+-----------------+
| ID|                 ts|     Top-Up Month|
+---+-------------------+-----------------+
|  1|2022-10-13 21:58:18|      > 48 Months|
|  2|2022-10-13 21:58:18|No Top-up Service|
|  3|2022-10-13 21:58:18|     12-18 Months|
|  4|2022-10-13 21:58:18|     36-48 Months|
|  5|2022-10-13 21:58:18|     18-24 Months|
|  6|2022-10-13 21:58:18|     30-36 Months|
|  7|2022-10-13 21:58:18|      > 48 Months|
|  8|2022-10-13 21:58:18|     36-48 Months|
|  9|2022-10-13 21:58:18|No Top-up Service|
| 10|2022-10-13 21:58:18|No Top-up Service|
| 11|2022-10-13 21:58:18|No Top-up Service|
| 12|2022-10-13 21:58:18|No Top-up Service|
| 13|2022-10-13 21:58:18|No Top-up Service|
| 14|2022-10-13 21:58:18|No Top-up Service|
| 15|2022-10-13 21:58:18|No Top-up Service|
| 16|2022-10-13 21:58:18|No Top-up Service|
| 17|2022-10-13 21:58:18|No Top-up Service|
| 18|2022-10-13 21:58:18|No Top-up Service|
| 19|2022-10-13 21:58:18|No Top-up Service|
| 20|2022-10-13 21:58:18|No Top-

In [23]:
# Stopping the stream
query.stop()

We will take a similar approach for the bureau data as follows:

In [12]:
# Parsing the ingested bureau data in json format
bureau_json = bureau_ingest.select(F.from_json(F.col("value").cast("string"), schema_bureau).alias('parsed_value'))

# Checking the schema of the parsed value
bureau_json.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ID: string (nullable = true)
 |    |    |-- SELF-INDICATOR: string (nullable = true)
 |    |    |-- MATCH-TYPE: string (nullable = true)
 |    |    |-- ACCT-TYPE: string (nullable = true)
 |    |    |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |    |    |-- DATE-REPORTED: string (nullable = true)
 |    |    |-- OWNERSHIP-IND: string (nullable = true)
 |    |    |-- ACCOUNT-STATUS: string (nullable = true)
 |    |    |-- DISBURSED-DT: string (nullable = true)
 |    |    |-- CLOSE-DT: string (nullable = true)
 |    |    |-- LAST-PAYMENT-DATE: string (nullable = true)
 |    |    |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |    |    |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |    |    |-- INSTALLMENT-AMT: string (nullable = true)
 |    |    |-- CURRENT-BAL: string (nullable = true)
 |    |    |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |    |    

In [13]:
# Unnesting the json extracted bureau data 
bureau_explode = bureau_json.select(F.explode(F.col("parsed_value")).alias('unnested_value'))

# Checking the schema of the unnested bureau data
bureau_explode.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- SELF-INDICATOR: string (nullable = true)
 |    |-- MATCH-TYPE: string (nullable = true)
 |    |-- ACCT-TYPE: string (nullable = true)
 |    |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |    |-- DATE-REPORTED: string (nullable = true)
 |    |-- OWNERSHIP-IND: string (nullable = true)
 |    |-- ACCOUNT-STATUS: string (nullable = true)
 |    |-- DISBURSED-DT: string (nullable = true)
 |    |-- CLOSE-DT: string (nullable = true)
 |    |-- LAST-PAYMENT-DATE: string (nullable = true)
 |    |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |    |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |    |-- INSTALLMENT-AMT: string (nullable = true)
 |    |-- CURRENT-BAL: string (nullable = true)
 |    |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |    |-- OVERDUE-AMT: string (nullable = true)
 |    |-- WRITE-OFF-AMT: string (nullable = true)
 |    |-- ASSET_CLASS: string (nullabl

In [14]:
# Extracting the bureau columns from the unnested struct
bureau_df_formatted = bureau_explode.select(
    F.col("unnested_value.ID").alias("ID"),
    F.col("unnested_value.SELF-INDICATOR").alias("SELF-INDICATOR"),
    F.col("unnested_value.MATCH-TYPE").alias("MATCH-TYPE"),
    F.col("unnested_value.ACCT-TYPE").alias("ACCT-TYPE"),
    F.col("unnested_value.CONTRIBUTOR-TYPE").alias("CONTRIBUTOR-TYPE"),
    F.col("unnested_value.DATE-REPORTED").cast('timestamp').alias("DATE-REPORTED"),
    F.col("unnested_value.OWNERSHIP-IND").alias("OWNERSHIP-IND"),
    F.col("unnested_value.ACCOUNT-STATUS").alias("ACCOUNT-STATUS"),
    F.col("unnested_value.DISBURSED-DT").cast('timestamp').alias("DISBURSED-DT"),
    F.col("unnested_value.CLOSE-DT").cast('timestamp').alias("CLOSE-DT"),
    F.col("unnested_value.LAST-PAYMENT-DATE").cast('timestamp').alias("LAST-PAYMENT-DATE"),
    F.col("unnested_value.CREDIT-LIMIT/SANC AMT").alias("CREDIT-LIMIT/SANC AMT"),
    F.col("unnested_value.DISBURSED-AMT/HIGH CREDIT").alias("DISBURSED-AMT/HIGH CREDIT"),
    F.col("unnested_value.INSTALLMENT-AMT").alias("INSTALLMENT-AMT"),
    F.col("unnested_value.CURRENT-BAL").alias("CURRENT-BAL"),
    F.col("unnested_value.INSTALLMENT-FREQUENCY").alias("INSTALLMENT-FREQUENCY"),
    F.col("unnested_value.OVERDUE-AMT").alias("OVERDUE-AMT"),
    F.col("unnested_value.WRITE-OFF-AMT").cast('int').alias("WRITE-OFF-AMT"),
    F.col("unnested_value.ASSET_CLASS").alias("ASSET_CLASS"),
    F.col("unnested_value.REPORTED DATE - HIST").alias("REPORTED DATE - HIST"),
    F.col("unnested_value.DPD - HIST").alias("DPD - HIST"),
    F.col("unnested_value.CUR BAL - HIST").alias("CUR BAL - HIST"),
    F.col("unnested_value.AMT OVERDUE - HIST").alias("AMT OVERDUE - HIST"),
    F.col("unnested_value.AMT PAID - HIST").alias("AMT PAID - HIST"),
    F.col("unnested_value.TENURE").alias("TENURE"),
    F.col("unnested_value.ts").alias("ts")
)

# Checking the schema of the formatted bureau data
bureau_df_formatted.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR: string (nullable = true)
 |-- MATCH-TYPE: string (nullable = true)
 |-- ACCT-TYPE: string (nullable = true)
 |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |-- DATE-REPORTED: timestamp (nullable = true)
 |-- OWNERSHIP-IND: string (nullable = true)
 |-- ACCOUNT-STATUS: string (nullable = true)
 |-- DISBURSED-DT: timestamp (nullable = true)
 |-- CLOSE-DT: timestamp (nullable = true)
 |-- LAST-PAYMENT-DATE: timestamp (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |-- INSTALLMENT-AMT: string (nullable = true)
 |-- CURRENT-BAL: string (nullable = true)
 |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |-- OVERDUE-AMT: string (nullable = true)
 |-- WRITE-OFF-AMT: integer (nullable = true)
 |-- ASSET_CLASS: string (nullable = true)
 |-- REPORTED DATE - HIST: string (nullable = true)
 |-- DPD - HIST: string (nullable = true)
 |-- CUR BAL - HIST: str

In the final step of setting up the bureau data, we need to change the datatype of some of its columns to integer. However, since they also contain non-numeric characters, we must remove them before converting the datatype of these columns. The columns to be converted are:

* `DISBURSED-AMT/HIGH CREDIT`
* `CURRENT-BAL`
* `OVERDUE-AMT`
* `INSTALLMENT-AMT`

The code block to be used for this section was implemented effectively in 2A, and we will use it here too.

In [15]:
# Defining a udf function to discard non-numeric characters
udf_num = F.udf(lambda a: ''.join(b for b in a if b.isdigit()) if a else '')

In [16]:
# Applying the function to the disbursed amount column
bureau_df_formatted = bureau_df_formatted.withColumn('DISBURSED-AMT/HIGH CREDIT', udf_num('DISBURSED-AMT/HIGH CREDIT'))

# Converting the column data type to integer
bureau_df_formatted = bureau_df_formatted.withColumn("DISBURSED-AMT/HIGH CREDIT", 
                                 bureau_df_formatted["DISBURSED-AMT/HIGH CREDIT"].cast(IntegerType()))

In [17]:
# Applying the transformation function to the current balance column
bureau_df_formatted = bureau_df_formatted.withColumn('CURRENT-BAL', udf_num('CURRENT-BAL'))

# Converting the column data type to integer
bureau_df_formatted = bureau_df_formatted.withColumn("CURRENT-BAL", 
                                 bureau_df_formatted["CURRENT-BAL"].cast(IntegerType()))

In [18]:
# Applying the transformation function to the overdue amount column
bureau_df_formatted = bureau_df_formatted.withColumn('OVERDUE-AMT', udf_num('OVERDUE-AMT'))

# Converting the column data type to integer 
bureau_df_formatted = bureau_df_formatted.withColumn("OVERDUE-AMT", 
                                 bureau_df_formatted["OVERDUE-AMT"].cast(IntegerType()))

In [19]:
# Applying the transformation function to the installment amount column
bureau_df_formatted = bureau_df_formatted.withColumn("INSTALLMENT-AMT", udf_num('INSTALLMENT-AMT'))

# Updating the datatype for the installment amount column
bureau_df_formatted = bureau_df_formatted.withColumn("INSTALLMENT-AMT", 
                                                     bureau_df_formatted["INSTALLMENT-AMT"].cast(IntegerType()))

In [20]:
# Checking the schema of the formatted bureau data
bureau_df_formatted.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR: string (nullable = true)
 |-- MATCH-TYPE: string (nullable = true)
 |-- ACCT-TYPE: string (nullable = true)
 |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |-- DATE-REPORTED: timestamp (nullable = true)
 |-- OWNERSHIP-IND: string (nullable = true)
 |-- ACCOUNT-STATUS: string (nullable = true)
 |-- DISBURSED-DT: timestamp (nullable = true)
 |-- CLOSE-DT: timestamp (nullable = true)
 |-- LAST-PAYMENT-DATE: timestamp (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT: integer (nullable = true)
 |-- INSTALLMENT-AMT: integer (nullable = true)
 |-- CURRENT-BAL: integer (nullable = true)
 |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |-- OVERDUE-AMT: integer (nullable = true)
 |-- WRITE-OFF-AMT: integer (nullable = true)
 |-- ASSET_CLASS: string (nullable = true)
 |-- REPORTED DATE - HIST: string (nullable = true)
 |-- DPD - HIST: string (nullable = true)
 |-- CUR BAL - HIST:

From the schema, it looks like the formatting process was a success. Now, let us use `writestream` to print the stream data to the console to confirm that there are no issues. Once again, let us limit the number of columns to some of the important ones that we just converted in the previous step.

In [32]:
# Checking the formatted bureau data
query = bureau_df_formatted.select('ID', 'ts', 'OVERDUE-AMT', 'DISBURSED-AMT/HIGH CREDIT')\
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------------+-----------+-------------------------+
| ID|                 ts|OVERDUE-AMT|DISBURSED-AMT/HIGH CREDIT|
+---+-------------------+-----------+-------------------------+
|  1|2022-10-13 21:58:18|       null|                    44000|
|  1|2022-10-13 21:58:18|      37873|                    37352|
|  1|2022-10-13 21:58:18|          0|                   400000|
|  1|2022-10-13 21:58:18|          0|                   500000|
|  1|2022-10-13 21:58:18|          0|                   145000|
|  1|2022-10-13 21:58:18|       null|                        0|
|  1|2022-10-13 21:58:18|          0|                   300000|
|  1|2022-10-13 21:58:18|       null|                   500000|
|  1|2022-10-13 21:58:18|          0|                   275000|
|  2|2022-10-13 21:58:18|          0|                   450000|
|  2|2022-10-13 21:58:18|          0|                   354176|
|  2|20

In [33]:
# Stopping the query
query.stop()

#### 2.3.3 Watermarking

In the final step of our data preparation, we need to apply a watermark to both streams. Watermarking is a useful feature in Spark that handles the data that arrives late. Spark handles the late data by storing it in memory and including it in any future aggregations or manipulations that are done to the stream. The watermark can be set with the `withWatermark()` function. It requires a timestamp value which the column `ts` will provide. We will set the delay threshold to 5 seconds.

In [21]:
# Watermarking the customer stream
watermarked_customer = customer_df_formatted.withWatermark("ts", "5 seconds")

# Watermarking the bureau stream
watermarked_bureau = bureau_df_formatted.withWatermark("ts", "5 seconds")

### 2.4 Grouping the Bureau Data

The below actions will make some of the same changes that were made to the bureau data in 2A. The major operation includes grouping the numeric and non-numeric columns of the bureau data respectively.

#### 2.4.1 Transforming `SELF-INDICATOR`

In the first step, we will transform the values of the column `SELF-INDICATOR` from True/False to 1/0 respectively.

In [22]:
# Temporarily renaming the column for convenience of running the next query
watermarked_bureau = watermarked_bureau.withColumnRenamed('SELF-INDICATOR', 'self')

In [23]:
# Transforming the values of SELF-INDICATOR
watermarked_bureau = watermarked_bureau.withColumn("self", F.when(watermarked_bureau.self == 'TRUE', 1) \
      .when(watermarked_bureau.self == 'FALSE', 0) \
      .otherwise(watermarked_bureau.self))

In [24]:
# Reverting to the original name of the column
watermarked_bureau = watermarked_bureau.withColumnRenamed('self', 'SELF-INDICATOR')

In [25]:
# Changing the datatype of the column to integer
watermarked_bureau = watermarked_bureau.withColumn("SELF-INDICATOR", F.col("SELF-INDICATOR").cast(IntegerType()))

In [26]:
# Checking the schema of the transformed column
watermarked_bureau.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR: integer (nullable = true)
 |-- MATCH-TYPE: string (nullable = true)
 |-- ACCT-TYPE: string (nullable = true)
 |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |-- DATE-REPORTED: timestamp (nullable = true)
 |-- OWNERSHIP-IND: string (nullable = true)
 |-- ACCOUNT-STATUS: string (nullable = true)
 |-- DISBURSED-DT: timestamp (nullable = true)
 |-- CLOSE-DT: timestamp (nullable = true)
 |-- LAST-PAYMENT-DATE: timestamp (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT: integer (nullable = true)
 |-- INSTALLMENT-AMT: integer (nullable = true)
 |-- CURRENT-BAL: integer (nullable = true)
 |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |-- OVERDUE-AMT: integer (nullable = true)
 |-- WRITE-OFF-AMT: integer (nullable = true)
 |-- ASSET_CLASS: string (nullable = true)
 |-- REPORTED DATE - HIST: string (nullable = true)
 |-- DPD - HIST: string (nullable = true)
 |-- CUR BAL - HIST

#### 2.4.2 Grouping the Bureau Data

We will use the same grouping operation from 2A to aggregate the numeric and non-numeric columns of the bureau data. This means that, for the numeric columns, we will aggregate them by `summing` the values and give the postfix `_sum`. The non-numeric columns will be aggregated by computing their distinct `counts`. However, we have two changes in the code here:

* Firstly, we need to aggregate the columns simultaneously, given that Spark streaming does not support multiple aggregations yet.
* For the non-numeric columns, we will apply `approx_count_distinct` to it instead of `countDistinct` as Spark streaming does not support the former.
* Finally, we will also provide a 30-second window duration for the aggregation. Specifying a window will put the aggregated rows into time bound intervals.

In [27]:
# Creating a new dataframe with aggregated sum of numeric columns for each ID and aggregated count of non-numeric columns for each ID.
bureau_groups = watermarked_bureau.groupBy(F.window(watermarked_bureau.ts, "30 seconds"), F.col("ID")).agg(*(F.sum(F.col(column.name)).alias(column.name + '_sum')
                                                        for column in watermarked_bureau.schema.fields if column.name != 'ID' and
                                                        (isinstance(column.dataType, IntegerType) or
                                                         isinstance(column.dataType, DoubleType))), *(F.approx_count_distinct(F.col(column.name)).alias(column.name + '_dist')
                                                            for column in watermarked_bureau.schema.fields if column.name != 'ID' and column.name != 'ts' and
                                                            (not isinstance(column.dataType, IntegerType) or
                                                             isinstance(column.dataType, DoubleType))))

In [28]:
# Checking the schema of the bureau grouped data
bureau_groups.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- CREDIT-LIMIT/SANC AMT_dist: long (nullable = false)
 |-- INSTALLMENT-FREQUENCY_dist: long (nullable 

### 2.5 Unnesting the Window Columns and Joining the Streams

We are going to perform two operations in this step.

* Firstly, we need to unnest the window struct into two columns called `window_start` and `window_end` respectively.
* Finally, we will join the grouped bureau data with the customer data based on the ID column.

#### 2.5.1 Unnesting the Window Columns

We can unnest the window struct into the window start and end columns simply by using the `select()`, `col()` and 'alias()` functions as follows:

In [29]:
# Extracting the window start and end columns from the window struct
bureau_groups = bureau_groups.select(*bureau_groups.columns[1:], 
                                                   F.col("window.start").alias("window_start"), 
                                                   F.col("window.end").alias("window_end"))

In [30]:
# Printing the final schema for bureau groups
bureau_groups.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- CREDIT-LIMIT/SANC AMT_dist: long (nullable = false)
 |-- INSTALLMENT-FREQUENCY_dist: long (nullable = false)
 |-- ASSET_CLASS_dist: long (nullable = false)
 |-- REPORTED DATE - HIST_dist: long (nullable = false)
 |-- DPD - HIST

Let us use writestream to check a few columns of the grouped data.

In [66]:
# Checking the formatted bureau grouped data
query = bureau_groups.select('ID', 'DISBURSED-AMT/HIGH CREDIT_sum', 'DATE-REPORTED_dist', 
                             'window_start', 'window_end')\
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----------------------------+------------------+------------+----------+
| ID|DISBURSED-AMT/HIGH CREDIT_sum|DATE-REPORTED_dist|window_start|window_end|
+---+-----------------------------+------------------+------------+----------+
+---+-----------------------------+------------------+------------+----------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----------------------------+------------------+-------------------+-------------------+
|  ID|DISBURSED-AMT/HIGH CREDIT_sum|DATE-REPORTED_dist|       window_start|         window_end|
+----+-----------------------------+------------------+-------------------+-------------------+
| 811|                      1340000|                 5|2022-10-14 06:03:00|2022-10-14 06:03:30|
|2811|                      1129000|                 3|2022-10-15 03:00:00|2022-10-15 03:00:30|
|  57|                     70734744|                33|2022-10-14 06:28:00|2022-10-14 06:28:30|
|2660|                      1130000|                 3|2022-10-15 04:39:00|2022-10-15 04:39:30|
|5519|                      3552611|                11|2022-10-14 04:19:30|2022-10-14 04:20:00|
|1579|                       250000|                 1|2022-10-17 01:40:30|2022-10-17 01:41:00|
| 389|                      4090000|   



In [68]:
# Stopping the stream
query.stop()

### 2.5.2 Joining the Streams

We will join the grouped bureau stream with the customer stream by using the `join()` function in Spark and `expr()` to specify the joining condition. Additionally, we also need to include only those joined rows where the timestamp of the customer row lies between the window start time and window end time of the corresponding bureau data. To specify these two conditions, we can use the `alias()` function for convenience.

First, let us create a copy of the customer stream and rename the ID column so that we may be able to drop the extra ID column easily from the joined data.

In [31]:
# Renaming the ID column in the customer dataset for conveniently dropping it from the joined data
customer_new = watermarked_customer.withColumnRenamed("ID","ID2")

In [32]:
# Joining the customer and bureau streams
cust_bureau_df = bureau_groups.alias('bu').join(customer_new.alias('cust'), on = F.expr("""
cust.ID2 = bu.ID AND
cust.ts BETWEEN bu.window_start AND bu.window_end
"""), how = "inner").drop('ID2')

In [33]:
# Checking the schema of the final joined stream
cust_bureau_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- CREDIT-LIMIT/SANC AMT_dist: long (nullable = false)
 |-- INSTALLMENT-FREQUENCY_dist: long (nullable = false)
 |-- ASSET_CLASS_dist: long (nullable = false)
 |-- REPORTED DATE - HIST_dist: long (nullable = false)
 |-- DPD - HIST

### 2.6 Writing to Parquet

Parquet is a file format that is supported by Spark. Parquet files are able to maintain the information about the schema of the data automatically. However, unlike csv files or txt files, parquet files are not readable until they are loaded with Spark's special capabilities.

For the above created customer bureau stream, we can use `writestream` to persist the stream data in parquet format. We will also specify the checkpoint location for the parquet files. As the name suggests, it will contain the state of the streamed data for fault tolerance. Let us write some data to parquet format and stop the writestream after we have persisted sufficient data. The columns to be selected for persisting to parquet are:

* `ID`
* `window_start`
* `window_end`
* `ts`
* `Top-up Month` which will be renamed to `Top-up_Month`.

In [76]:
# Writing the data into parquet format
query_file_sink = cust_bureau_df.select('ID', 'window_start', 'window_end', 'ts', 'Top-up Month')\
                                .withColumnRenamed('Top-up Month', 'Top-up_Month')\
                                .writeStream.format("parquet")\
                                .outputMode("append")\
                                .option("path", "parquet/cust_bureau_df")\
                                .option("checkpointLocation", "parquet/cust_bureau_df/checkpoint")\
                                .start()



In [77]:
# Stopping the file_sink query
query_file_sink.stop()



### 2.7 Pipeline Model Transformation and Parquet Persistence

This section is a continuation of the activities performed in 2A. In 2A, we were able to successfully persist a machine learning model pipeline. In this section, we are going to check whether a pipeline model can be applied to the customer bureau stream. We need to perform two actions here:

* Transform the data with the loaded model pipeline.
* Persist the model predictions in parquet format.

#### 2.7.1 Making Predictions

Firstly, let us import the `PipelineModel` module of the pyspark.ml library.

In [34]:
# Importing the required libraries
from pyspark.ml import PipelineModel

Next, we can load the persisted pipeline model. The function `load()` will allow us to do this.

In [35]:
# Loading the pipeline model
gradient_booster_fit = PipelineModel.load("topup_pipeline_model")

                                                                                

Unlike the data we dealt with in 2A, this new customer bureau data may contain null values. Therefore, we must st the model configuration for the NULL handling to `keep` here. We will set this configuration for stage 2 of the model which is the Vector Assembler.

In [36]:
# Adjusting the loaded model to handle NULL values
gradient_booster_fit.stages[-2].setHandleInvalid("keep")

VectorAssembler_ac48f700b95d

We can now use the familiar `transform()` function to apply the model to the customer bureau stream and generate the predictions.

In [37]:
# Applying the fitted model to the customer bureau stream
gb_pred = gradient_booster_fit.transform(cust_bureau_df)

In [38]:
# Printing the schema of the predicted stream
gb_pred.printSchema()

root
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- CREDIT-LIMIT/SANC AMT_dist: long (nullable = false)
 |-- INSTALLMENT-FREQUENCY_dist: long (nullable = false)
 |-- ASSET_CLASS_dist: long (nullable = false)
 |-- REPORTED DATE - HIST_dist: long (nullable = false)
 |-- DPD - HIST

#### 2.7.2 Persisting the Predictions

If we observe the schema of the transformed data, we can see that the predictions have been successfully generated here. However, let us confirm that the data in these predictions is proper with the help of the writestream function of Spark streaming. We will specifically select those columns to be displayed on the console which we intend to write to the parquet file.

In [83]:
# Checking the prediction data
query = gb_pred.withColumnRenamed('Top-up Month', 
                                    'Top-up_Month').select('ID', 
                                                           'window_start', 
                                                           'window_end', 
                                                           'ts', 
                                                           'prediction', 'Top-up_Month')\
                                                   .writeStream \
                                                   .outputMode("append") \
                                                   .format("console") \
                                                   .start()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---+------------+----------+---+----------+------------+
| ID|window_start|window_end| ts|prediction|Top-up_Month|
+---+------------+----------+---+----------+------------+
+---+------------+----------+---+----------+------------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-------------------+-------------------+-------------------+----------+-----------------+
|  ID|       window_start|         window_end|                 ts|prediction|     Top-up_Month|
+----+-------------------+-------------------+-------------------+----------+-----------------+
|3414|2022-10-15 04:52:30|2022-10-15 04:53:00|2022-10-15 04:52:53|       1.0|No Top-up Service|
|1159|2022-10-14 06:06:00|2022-10-14 06:06:30|2022-10-14 06:06:10|       0.0|No Top-up Service|
|1436|2022-10-15 02:50:30|2022-10-15 02:51:00|2022-10-15 02:50:43|       0.0|No Top-up Service|
| 675|2022-10-15 09:33:30|2022-10-15 09:34:00|2022-10-15 09:33:41|       0.0|No Top-up Service|
| 675|2022-10-15 08:53:30|2022-10-15 08:54:00|2022-10-15 08:53:47|       0.0|No Top-up Service|
|1159|2022-10-14 05:37:30|2022-10-14 05:38:00|2022-10-14 05:37:38|       0.0|No Top-up Service|
|4032|2022-10-14 04:05:30|2022-10-14 04



In [84]:
query.stop()



It looks like the predictions are coming along nicely. We can go ahead and persist these predictions in parquet format. Similar to what we did in section 2.6, we will rename the column `Top-up Month` as `Top-up_Month`. As always, we will let a decent number of parquet files be written before we stop the query.

In [85]:
# Persisting the prediction data in parquet format
query_file_sink = gb_pred.withColumnRenamed('Top-up Month', 'Top-up_Month')\
                         .select('ID', 'window_start', 'window_end', 'ts', 'prediction', 'Top-up_Month')\
                         .writeStream.format("parquet")\
                         .outputMode("append")\
                         .option("path", "./parquet/top_up_predictions")\
                         .option("checkpointLocation", "./parquet/top_up_predictions/checkpoint")\
                         .start()



In [86]:
# Stop the file_sink query
query_file_sink.stop()



### 2.8 Target Customers

We have confirmed that we are able to successfully generate predictions for the data. In this section, we will be focusing on the target customers from these predictions. In other words, these are the predictions where the Top-up prediction is 1. We want to know more about these target customers with respect to the state they come from. Furthermore, we need to broadcast this information as a kafka topic to the consumer.

Therefore, we will group the predictions by State and count them. Since we have a number of moving parts here, let us define a function to be applied to each batch of the customer bureau stream and send it over to the consumer. Here are the steps that will be followed:

* Check if the current customer bureau batch is empty. If it is empty then skip to the next batch.
* If the batch is not empty, transform it with the pipeline model and generate the predictions.
* Initialise a new Kafka producer on the same bootstrap sever with a `value_serializer` to convert the data into a stream of bytes and a `key_serializer` to do the same to the key object.
* Filter the predictions by target customers, group them by state and count them.
* Check if the grouping of the current batch is empty or not. If it is empty, then it will skip to the next batch.
* Find the max timestamp of the `window_end` time.
* Convert the aggregated prediction data to json as a preparation for streaming.
* Publish the topic and its data to the consumer.
* Print the published data.

In [37]:
# Defining a function to send the predicted customer data to the consumer
def prediction_stream(cust_bureau, epochID):
    
    # Checking if the current batch contains any rows
    if cust_bureau.count() == 0:

        print("Current batch is empty! Skipping..")

        # Skipping
        return
    
    # Opening a try block
    try:
        
        # Making predictions with the pipeline model
        predictionStream_df = gradient_booster_fit.transform(cust_bureau)

        prediction_topic = "Statewide_customer_count"
        
        # Initialising a kafka producer to send the prediction data to the consumer
        prediction_producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], 
                                  value_serializer = lambda v: dumps(v).encode('ascii'),
                                  key_serializer = lambda a: dumps(a).encode('ascii'),
                                   api_version = (0, 10))
        
        # Isolating the predictions in terms of the target consumers
        state_count_df = predictionStream_df.filter("prediction = 1.0")\
                                            .groupBy("State").count()\
                                            .withColumnRenamed("count","target_customers")

        # Checking if the aggregated prediction data is empty
        if state_count_df.count() == 0:

            print("No update to customer counts in the current batch! Skipping..")

            # Skipping
            return

        else:

            # Applying the window_end value as the key for the current batch
            end_ts = str(predictionStream_df.agg({"window_end":"max"}).collect()[0][0])
        
            # Setting up the data to be streamed
            stream_data = state_count_df.toPandas().to_json(orient = "records")
        
            # Publishing the streamed data
            prediction_producer.send(prediction_topic, value = stream_data, key = end_ts)
        
            # Printing the data being streamed
            print(f"Epoch ID: {epochID}, Timestamp: {end_ts},  Data: {stream_data}")
        
    except Exception as ex:
        
        print(f"Exception observed in current batch {ex}. Exiting...")

We can use the `foreachBatch()` function in the writestream statement to apply the above function to each batch of the customer bureau stream.

In [38]:
# Starting the producer to send the count of predicted top-up customers
query = cust_bureau_df.writeStream.foreachBatch(prediction_stream).outputMode("append").start()



Current batch is empty! Skipping..


                                                                                

Epoch ID: 1, Timestamp: 2022-10-18 14:56:00,  Data: [{"State":"WEST BENGAL","target_customers":494},{"State":"CHATTISGARH","target_customers":434},{"State":"RAJASTHAN","target_customers":294},{"State":"GUJARAT","target_customers":382},{"State":"KARNATAKA","target_customers":62},{"State":"ORISSA","target_customers":521},{"State":"UTTAR PRADESH","target_customers":170},{"State":"MADHYA PRADESH","target_customers":4161},{"State":"MAHARASHTRA","target_customers":34},{"State":"PUNJAB","target_customers":564},{"State":"UTTARAKHAND","target_customers":288},{"State":"HARYANA","target_customers":252},{"State":"TELANGANA","target_customers":15},{"State":"ANDHRA PRADESH","target_customers":454}]




In [45]:
# Stopping the query
query.stop()

                                                                                

Epoch ID: 2, Timestamp: 2022-10-18 13:32:00,  Data: [{"State":"CHATTISGARH","target_customers":3},{"State":"ORISSA","target_customers":12},{"State":"MADHYA PRADESH","target_customers":4}]


**NOTE**: The above output is incomplete on the producer side and consumer side because the cloud VM at Monash University is unable to take the load of this operation and generate the map. I have already tested the logic of the code producing the map on my local system and so I am confident that it will not fail if this entire application from step 1 to step 3 is run on a system with a better memory.

I strongly **recommend** using a system with good memory to execute the code for the 3 steps especially for section 2.8 and Step 3: Consumer.

## References

* Parquet Files - Spark 3.1.2 Documentation. (n.d.). Spark.apache.org. https://spark.apache.org/docs/latest/sql-data-sources-parquet.html