**Author name:** Thanh Chung Nguyen



## 2.1 Write code to SparkSession is created using a SparkConf object, which would use two local cores with a proper application name, and use UTC as the timezone.


In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col
from pyspark import SparkConf

master = "local[2]"
spark_conf = SparkConf().setMaster(master)

spark = SparkSession \
    .builder \
    .appName("Analysis in Spark") \
    .config("spark.sql.session.timeZone", 'UTC')\
    .config(conf=spark_conf)\
    .getOrCreate()


## 2.2 Use the same topic names from the Kafka producer in Task 1, ingest the streaming data into Spark Streaming and assume all data coming in String format.


In [2]:
topic_bu = "bureau"
df_bu = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic_bu) \
    .load()

In [3]:
topic_cus = "customer"
df_cus = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic_cus) \
    .load()

In [4]:
df_bu = df_bu.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df_cus = df_cus.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
schema_bu = ArrayType(StructType([    
    StructField('ID', StringType(), True), 
    StructField('SELF-INDICATOR', StringType(), True),
    StructField('MATCH-TYPE', StringType(), True), 
    StructField('ACCT-TYPE', StringType(), True),
    StructField('CONTRIBUTOR-TYPE', StringType(), True), 
    StructField('DATE-REPORTED', StringType(), True),
    StructField('OWNERSHIP-IND', StringType(), True), 
    StructField('ACCOUNT-STATUS', StringType(), True),
    StructField('DISBURSED-DT', StringType(), True), 
    StructField('CLOSE-DT', StringType(), True),
    StructField('LAST-PAYMENT-DATE', StringType(), True),
    StructField('CREDIT-LIMIT/SANC AMT', StringType(), True), 
    StructField('DISBURSED-AMT/HIGH CREDIT', StringType(), True),
    StructField('INSTALLMENT-AMT', StringType(), True), 
    StructField('CURRENT-BAL', StringType(), True),
    StructField('INSTALLMENT-FREQUENCY', StringType(), True), 
    StructField('OVERDUE-AMT', StringType(), True), 
    StructField('WRITE-OFF-AMT', StringType(), True),
    StructField('ASSET_CLASS', StringType(), True), 
    StructField('REPORTED DATE - HIST', StringType(), True),
    StructField('DPD - HIST', StringType(), True), 
    StructField('CUR BAL - HIST', StringType(), True),
    StructField('AMT OVERDUE - HIST', StringType(), True), 
    StructField('AMT PAID - HIST', StringType(), True),
    StructField('TENURE', StringType(), True), 
    StructField('ts', TimestampType(), True)            
]))

In [6]:
schema_cus = ArrayType(StructType([    
    StructField('ID', StringType(), True), 
    StructField('Frequency', StringType(), True),
    StructField('InstlmentMode', StringType(), True), 
    StructField('LoanStatus', StringType(), True),
    StructField('PaymentMode', StringType(), True), 
    StructField('BranchID', StringType(), True),
    StructField('Area', StringType(), True), 
    StructField('Tenure', StringType(), True),
    StructField('AssetCost', StringType(), True), 
    StructField('AmountFinance', StringType(), True),
    StructField('DisbursalAmount', StringType(), True),
    StructField('EMI', StringType(), True), 
    StructField('DisbursalDate', StringType(), True),
    StructField('MaturityDAte', StringType(), True), 
    StructField('AuthDate', StringType(), True),
    StructField('AssetID', StringType(), True), 
    StructField('ManufacturerID', StringType(), True), 
    StructField('SupplierID', StringType(), True),
    StructField('LTV', StringType(), True), 
    StructField('SEX', StringType(), True),
    StructField('AGE', StringType(), True), 
    StructField('MonthlyIncome', StringType(), True),
    StructField('City', StringType(), True), 
    StructField('State', StringType(), True),
    StructField('ZiPCODE', StringType(), True), 
    StructField('Top-up Month', StringType(), True),
    StructField('ts', TimestampType(), True)            
]))

In [7]:
df_bu =df_bu.select(F.from_json(F.col("value").cast("string"), schema_bu).alias('parsed_value'))
df_bu = df_bu.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  

In [8]:
df_cus =df_cus.select(F.from_json(F.col("value").cast("string"), schema_cus).alias('parsed_value'))
df_cus = df_cus.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  

In [9]:
df_bu.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- SELF-INDICATOR: string (nullable = true)
 |    |-- MATCH-TYPE: string (nullable = true)
 |    |-- ACCT-TYPE: string (nullable = true)
 |    |-- CONTRIBUTOR-TYPE: string (nullable = true)
 |    |-- DATE-REPORTED: string (nullable = true)
 |    |-- OWNERSHIP-IND: string (nullable = true)
 |    |-- ACCOUNT-STATUS: string (nullable = true)
 |    |-- DISBURSED-DT: string (nullable = true)
 |    |-- CLOSE-DT: string (nullable = true)
 |    |-- LAST-PAYMENT-DATE: string (nullable = true)
 |    |-- CREDIT-LIMIT/SANC AMT: string (nullable = true)
 |    |-- DISBURSED-AMT/HIGH CREDIT: string (nullable = true)
 |    |-- INSTALLMENT-AMT: string (nullable = true)
 |    |-- CURRENT-BAL: string (nullable = true)
 |    |-- INSTALLMENT-FREQUENCY: string (nullable = true)
 |    |-- OVERDUE-AMT: string (nullable = true)
 |    |-- WRITE-OFF-AMT: string (nullable = true)
 |    |-- ASSET_CLASS: string (nullabl

In [10]:
df_cus.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- ID: string (nullable = true)
 |    |-- Frequency: string (nullable = true)
 |    |-- InstlmentMode: string (nullable = true)
 |    |-- LoanStatus: string (nullable = true)
 |    |-- PaymentMode: string (nullable = true)
 |    |-- BranchID: string (nullable = true)
 |    |-- Area: string (nullable = true)
 |    |-- Tenure: string (nullable = true)
 |    |-- AssetCost: string (nullable = true)
 |    |-- AmountFinance: string (nullable = true)
 |    |-- DisbursalAmount: string (nullable = true)
 |    |-- EMI: string (nullable = true)
 |    |-- DisbursalDate: string (nullable = true)
 |    |-- MaturityDAte: string (nullable = true)
 |    |-- AuthDate: string (nullable = true)
 |    |-- AssetID: string (nullable = true)
 |    |-- ManufacturerID: string (nullable = true)
 |    |-- SupplierID: string (nullable = true)
 |    |-- LTV: string (nullable = true)
 |    |-- SEX: string (nullable = true)
 |    |-- AGE: string (nullable = tr

In [11]:
df_bu = df_bu.select(
                    F.col("unnested_value.ID").alias("ID"),
                    F.col("unnested_value.SELF-INDICATOR").alias("SELF-INDICATOR"),
                    F.col("unnested_value.MATCH-TYPE").alias("MATCH-TYPE"),
                    F.col("unnested_value.ACCT-TYPE").alias("ACCT-TYPE"),
                    F.col("unnested_value.CONTRIBUTOR-TYPE").alias("CONTRIBUTOR-TYPE"),
                    F.col("unnested_value.DATE-REPORTED").alias("DATE-REPORTED"),
                    F.col("unnested_value.OWNERSHIP-IND").alias("OWNERSHIP-IND"),
                    F.col("unnested_value.ACCOUNT-STATUS").alias("ACCOUNT-STATUS"),
                    F.col("unnested_value.DISBURSED-DT").alias("DISBURSED-DT"),
                    F.col("unnested_value.CLOSE-DT").alias("CLOSE-DT"),
                    F.col("unnested_value.LAST-PAYMENT-DATE").alias("LAST-PAYMENT-DATE"),
                    F.col("unnested_value.CREDIT-LIMIT/SANC AMT").alias("CREDIT-LIMIT/SANC AMT"),
                    F.col("unnested_value.DISBURSED-AMT/HIGH CREDIT").alias("DISBURSED-AMT/HIGH CREDIT"),
                    F.col("unnested_value.INSTALLMENT-AMT").alias("INSTALLMENT-AMT"),
                    F.col("unnested_value.CURRENT-BAL").alias("CURRENT-BAL"),
                    F.col("unnested_value.INSTALLMENT-FREQUENCY").alias("INSTALLMENT-FREQUENCY"),
                    F.col("unnested_value.OVERDUE-AMT").alias("OVERDUE-AMT"),
                    F.col("unnested_value.WRITE-OFF-AMT").alias("WRITE-OFF-AMT"),
                    F.col("unnested_value.ASSET_CLASS").alias("ASSET_CLASS"),
                    F.col("unnested_value.REPORTED DATE - HIST").alias("REPORTED DATE - HIST"),
                    F.col("unnested_value.DPD - HIST").alias("DPD - HIST"),
                    F.col("unnested_value.CUR BAL - HIST").alias("CUR BAL - HIST"),
                    F.col("unnested_value.AMT OVERDUE - HIST").alias("AMT OVERDUE - HIST"),
                    F.col("unnested_value.AMT PAID - HIST").alias("AMT PAID - HIST"),
                    F.col("unnested_value.TENURE").alias("TENURE"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [12]:
df_cus = df_cus.select(
                    F.col("unnested_value.ID").alias("ID"),
                    F.col("unnested_value.Frequency").alias("Frequency"),
                    F.col("unnested_value.InstlmentMode").alias("InstlmentMode"),
                    F.col("unnested_value.LoanStatus").alias("LoanStatus"),
                    F.col("unnested_value.PaymentMode").alias("PaymentMode"),
                    F.col("unnested_value.BranchID").alias("BranchID"),
                    F.col("unnested_value.Area").alias("Area"),
                    F.col("unnested_value.Tenure").alias("Tenure"),
                    F.col("unnested_value.AssetCost").alias("AssetCost"),
                    F.col("unnested_value.AmountFinance").alias("AmountFinance"),
                    F.col("unnested_value.DisbursalAmount").alias("DisbursalAmount"),
                    F.col("unnested_value.EMI").alias("EMI"),
                    F.col("unnested_value.DisbursalDate").alias("DisbursalDate"),
                    F.col("unnested_value.MaturityDAte").alias("MaturityDAte"),
                    F.col("unnested_value.AuthDate").alias("AuthDate"),
                    F.col("unnested_value.AssetID").alias("AssetID"),
                    F.col("unnested_value.ManufacturerID").alias("ManufacturerID"),
                    F.col("unnested_value.SupplierID").alias("SupplierID"),
                    F.col("unnested_value.LTV").alias("LTV"),
                    F.col("unnested_value.SEX").alias("SEX"),
                    F.col("unnested_value.AGE").alias("AGE"),
                    F.col("unnested_value.MonthlyIncome").alias("MonthlyIncome"),
                    F.col("unnested_value.City").alias("City"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.ZiPCODE").alias("ZiPCODE"),
                    F.col("unnested_value.Top-up Month").alias("Top-up Month"),
                    F.col("unnested_value.ts").alias("ts")
                )

## 2.3 Then the streaming data format should be transformed into the proper formats following the metadata file schema, similar to assignment 2A. Then use 'ts' column as the watermark and set the delay threshold to 5 seconds.

**Remove comma**

In [13]:
comma_col = ['CREDIT-LIMIT/SANC AMT', 'DISBURSED-AMT/HIGH CREDIT', 'INSTALLMENT-AMT', 'CURRENT-BAL', 'OVERDUE-AMT']

In [14]:
def lookup_correct_id(value):
    
    # @param: value: contain row in 1 columns

    try:
        # We checck if value can replace comma
        value_reg = value.replace(",","")
        return value_reg
    except:
        # if not we return the same format
        return value


# create UDF 
convert_num_udf = udf(lookup_correct_id, StringType())

In [15]:
# loop the whole column
for cols in comma_col:
    
    # Apply UDF to change each column with comma values
    df_bu = df_bu.withColumn(cols, convert_num_udf(col(cols)))

**Remove month in INSTALLMENT-AMT**

In [16]:
from pyspark.sql.functions import regexp_replace
df_bu = df_bu.withColumn("INSTALLMENT-AMT",regexp_replace(col("INSTALLMENT-AMT"), "/[a-zA-Z]*",""))

**Cast specific column to integer and double type**

In [17]:
# cast type in df_bu
df_bu = df_bu.withColumn("ID",col("ID").cast(IntegerType())) \
                    .withColumn("CREDIT-LIMIT/SANC AMT",col("CREDIT-LIMIT/SANC AMT").cast(IntegerType())) \
                    .withColumn("DISBURSED-AMT/HIGH CREDIT",col("DISBURSED-AMT/HIGH CREDIT").cast(IntegerType())) \
                    .withColumn("INSTALLMENT-AMT",col("INSTALLMENT-AMT").cast(IntegerType())) \
                    .withColumn("CURRENT-BAL",col("CURRENT-BAL").cast(IntegerType())) \
                    .withColumn("OVERDUE-AMT",col("OVERDUE-AMT").cast(IntegerType())) \
                    .withColumn("WRITE-OFF-AMT",col("WRITE-OFF-AMT").cast(IntegerType())) \
                    .withColumn("TENURE",col("TENURE").cast(IntegerType())) 

In [18]:
# cast type in df_cus
df_cus = df_cus.withColumn("ID",col("ID").cast(IntegerType())) \
                    .withColumn("Tenure",col("Tenure").cast(IntegerType())) \
                    .withColumn("AssetCost",col("AssetCost").cast(IntegerType())) \
                    .withColumn("AmountFinance",col("AmountFinance").cast(DoubleType())) \
                    .withColumn("DisbursalAmount",col("DisbursalAmount").cast(DoubleType())) \
                    .withColumn("LTV",col("LTV").cast(DoubleType())) \
                    .withColumn("AGE",col("AGE").cast(IntegerType())) \
                    .withColumn("MonthlyIncome",col("MonthlyIncome").cast(DoubleType())) \
                    .withColumn("ZiPCODE",col("ZiPCODE").cast(IntegerType())) \
                    .withColumn("EMI",col("EMI").cast(DoubleType())) 

**use 'ts' column as the watermark and set the delay threshold to 5 seconds**

In [19]:
df_bu = df_bu \
    .withWatermark("ts", "5 seconds")

## 2.4 Group the bureau stream based on ID with 30 seconds window duration, similar to assignment 2A(same rule for sum and dist).
- Transform the “SELF-INDICATOR” column’s values. If the value is true, then convert to 1, if the value is false, then convert to 0.
- sum the rows for numeric type columns, count distinct values for other columns with other data types, and rename them with the postfix like '_sum' or '_dist'. (For example, we did the sum function based on the 'HIGH CREDIT', and the new column’s name will be 'HIGH CREDIT_sum').


**Transform SELF-INDICATOR**

In [20]:
from pyspark.sql.types import IntegerType
def convert_indicator(value):
    # @param: value: contain row in 1 columns
    try:
        # We checck if value equal true replace 1
        if value == "true":
            return 1
        else:
            return 0
        
    except:
        return 0

In [21]:
convert_indicator_udf = udf(convert_indicator, StringType())

In [22]:
df_bu = df_bu.withColumn("SELF-INDICATOR",convert_indicator_udf(col("SELF-INDICATOR")))

In [23]:
df_bu = df_bu.withColumn("SELF-INDICATOR", col("SELF-INDICATOR").cast(IntegerType()))

**Sum and count distinct values in each column**

In [24]:
column_avoid = ["ID", "Top-up Month"]

# column numeric for bureau data frame
bu_num_col = [item[0] for item in df_bu.dtypes if item[1].startswith('int') or item[1].startswith('double')]
bu_num_col = [item for item in bu_num_col if item not in column_avoid]

# column categories for bureau data frame
bu_str_col = [item[0] for item in df_bu.dtypes if item[1].startswith('string')]
bu_str_col = [item for item in bu_str_col if item not in column_avoid]

In [25]:
# create aggregation sum for numeric column in bureau dataframe
exprs_num = [sum(x) for x in bu_num_col]
exprs_num_name = [str(i) + "_sum" for i in bu_num_col]

# create aggregation countdistinct for numeric column in bureau dataframe
exprs_str = [approx_count_distinct(x) for x in bu_str_col]
exprs_str_name = [str(i) + "_dist" for i in bu_str_col]

In [26]:
df_bu = df_bu\
    .groupBy(window(df_bu.ts, "30 seconds"), "ID")\
    .agg(*exprs_num, *exprs_str)

In [27]:
exprs_name = ["window"] + ["ID"] + exprs_num_name + exprs_str_name
# change column name of group_sum_dist
count = 0
for field in df_bu.schema.fields:
    df_bu = df_bu.withColumnRenamed(field.name, exprs_name[count])
    count+=1

### 2.5 Create new columns named 'window_start' and 'window_end' which are the window’s start time and end time in 2.4. Then inner join the 2 streams based on 'ID', and only customer data received between the window time are accepted. For example, customer data ID '3' received at 10:00, and only when the window of corresponding bureau data contains 10:00(like window start: 9:59, end: 10:00), then this data is accepted.


**Select new column window_start and window_end**

In [28]:
df_bu = df_bu.select(F.col("window.start").alias("window_start"),
                F.col("window.end").alias("window_end"),
                F.col("ID"),
                F.col("CREDIT-LIMIT/SANC AMT_sum"),
                F.col("DISBURSED-AMT/HIGH CREDIT_sum"),
                F.col("INSTALLMENT-AMT_sum"),
                F.col("CURRENT-BAL_sum"),
                F.col("OVERDUE-AMT_sum"),
                F.col("WRITE-OFF-AMT_sum"),
            F.col("TENURE_sum"),
            F.col("SELF-INDICATOR_sum"),
            F.col("MATCH-TYPE_dist"),
            F.col("ACCT-TYPE_dist"),
            F.col("CONTRIBUTOR-TYPE_dist"),
            F.col("DATE-REPORTED_dist"),
            F.col("OWNERSHIP-IND_dist"),
            F.col("ACCOUNT-STATUS_dist"),
            F.col("DISBURSED-DT_dist"),
            F.col("CLOSE-DT_dist"),
            F.col("LAST-PAYMENT-DATE_dist"),
            F.col("INSTALLMENT-FREQUENCY_dist"),
            F.col("ASSET_CLASS_dist"),
            F.col("REPORTED DATE - HIST_dist"),
            F.col("DPD - HIST_dist"),
            F.col("CUR BAL - HIST_dist"),
            F.col("AMT OVERDUE - HIST_dist"),
            F.col("AMT PAID - HIST_dist"))

**Join 2 dataframe**

In [29]:
# change ID in df_bu
df_bu = df_bu.withColumnRenamed("ID", "ID1")

In [30]:
# join dataframe
df_join = df_bu.join(
  df_cus,
  expr("""
    ID = ID1 AND
    ts >= window_start AND
    ts <= window_end
    """),"inner"
)

# drop column ID1
df_join = df_join.drop(col("ID1"))

In [31]:
df_join.printSchema()

root
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: long (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: long (nullable = true)
 |-- INSTALLMENT-AMT_sum: long (nullable = true)
 |-- CURRENT-BAL_sum: long (nullable = true)
 |-- OVERDUE-AMT_sum: long (nullable = true)
 |-- WRITE-OFF-AMT_sum: long (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false)
 |-- INSTALLMENT-FREQUENCY_dist: long (nullable = false)
 |-- ASSET_CLASS_dist:

### 2.6 Persist the above result in parquet format.(When you save the data to parquet format,you need to rename “Top-up Month” to “Top-up_Month” first. And only keep these columns “ID”, “window_start”, “window_end”, “ts”, “Top-up_Month”) Renaming “Top-up Month” only happen in this question


In [32]:
# change name in top-up month
df_join = df_join.withColumnRenamed("Top-up Month", "Top-up_Month")

In [33]:
# drop na if have
df_join = df_join.na.drop()

In [34]:
# select column to save
df_join1 = df_join.select("ID", "window_start", "window_end","ts","Top-up_Month")

In [96]:
# start parquet
query_file_join = df_join1.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/df_join")\
        .option("checkpointLocation", "parquet/df_join/checkpoint")\
        .start()

In [97]:
# stop parquet
query_file_join.stop()

In [98]:
# Read the saved parquet data
query_file_join_read = spark.read.parquet("parquet/df_join")
query_file_join_read.printSchema()
query_file_join_read.show()

root
 |-- ID: integer (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- Top-up_Month: string (nullable = true)

+-----+-------------------+-------------------+-------------------+-----------------+
|   ID|       window_start|         window_end|                 ts|     Top-up_Month|
+-----+-------------------+-------------------+-------------------+-----------------+
|17615|2022-10-18 04:19:30|2022-10-18 04:20:00|2022-10-18 04:19:42|No Top-up Service|
|17645|2022-10-18 04:19:30|2022-10-18 04:20:00|2022-10-18 04:19:52|No Top-up Service|
|17582|2022-10-18 04:19:30|2022-10-18 04:20:00|2022-10-18 04:19:37|No Top-up Service|
|17740|2022-10-18 04:20:00|2022-10-18 04:20:30|2022-10-18 04:20:19|No Top-up Service|
|17881|2022-10-18 04:20:30|2022-10-18 04:21:00|2022-10-18 04:20:56|No Top-up Service|
+-----+-------------------+-------------------+-------------------+-----------------+



## 2.7 Load the machine learning models given and use the model to predict whether users will be joining the top-up service. Save the results in parquet format. (When you save the data to parquet format,you need to rename “Top-up Month” to “Top-up_Month” first. And only keep these columns “ID”, “window_start”, “window_end”, “ts”, “prediction”, “Top-up_Month”) Renaming “Top-up Month” will happen in this question as well


In [35]:
from pyspark.ml import PipelineModel
model = PipelineModel.load('topup_pipeline_model')

In [36]:
prediction = model.transform(df_join)

In [37]:
prediction_store = prediction.select("ID", "window_start", "window_end", "prediction", "Top-up_Month")

In [38]:
model_file = prediction_store.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/model")\
        .option("checkpointLocation", "parquet/model/checkpoint")\
        .start()

In [44]:
model_file.stop()

In [43]:
# Read the saved parquet data
model_file_read = spark.read.parquet("parquet/model")
model_file_read.printSchema()
model_file_read.show()

root
 |-- ID: integer (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- prediction: double (nullable = false)
 |-- Top-up_Month: string (nullable = true)

+-----+-------------------+-------------------+----------+-----------------+
|   ID|       window_start|         window_end|prediction|     Top-up_Month|
+-----+-------------------+-------------------+----------+-----------------+
|18424|2022-10-18 04:23:30|2022-10-18 04:24:00|       0.0|No Top-up Service|
|18426|2022-10-18 04:23:30|2022-10-18 04:24:00|       1.0|No Top-up Service|
|18643|2022-10-18 04:24:30|2022-10-18 04:25:00|       0.0|No Top-up Service|
|18676|2022-10-18 04:24:30|2022-10-18 04:25:00|       0.0|No Top-up Service|
|18681|2022-10-18 04:24:30|2022-10-18 04:25:00|       0.0|No Top-up Service|
|18665|2022-10-18 04:24:30|2022-10-18 04:25:00|       0.0|No Top-up Service|
|18808|2022-10-18 04:25:00|2022-10-18 04:25:30|       0.0|No Top-up Service|
|18921|202

## 2.8. Only keep the customer predicted as our target customers (willing to join the top-up service). Normally, we should only keep “Top-up=1”. But due to the limited performance of our VM, if your process is extremely slow, you can abandon the filter and keep all of the data. Then for each batch, show the epoch id and count of the dataframe.

In [45]:

predicted_value = prediction.select("ID", "window_start", "window_end", "prediction", "Top-up_Month", "State")

In [46]:
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    if df.count() <= 0:
        print(f"bactch_id: {epoch_id}, count: {df.count()}")
    else:
        print(f"bactch_id: {epoch_id}, count: {df.count()}")
        
        # filter predition and count state
        df = df.select("window_end", "State", "prediction").filter("prediction == 1").groupBy("window_end", "State").agg(F.count("State").alias("count"))
        
        # convert state and count column to json format
        df = df.groupBy("window_end").agg(
        F.collect_list(
            to_json(struct("State", "count"))).alias("count")
        )
        
        # change name column
        df = df.withColumnRenamed("window_end", "key").withColumnRenamed("count", "value").sort("key")


        
        # write static data with parquet
        write = df.write.mode('append').format('parquet').option("path", "parquet/plot")\
        .option("checkpointLocation", "parquet/plot/checkpoint")\
        .save()
        
        
        
        
        
        # show dataframe
        if df.count() > 0:
            df.show(20)


In [47]:
query1 = predicted_value.writeStream\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='5 seconds')\
        .start()

In [49]:
# read data parquet
query_file_sink_df = spark.read.parquet("parquet/plot")

# get schema
schema_df = query_file_sink_df.schema

In [50]:
# read stream schema
paquet_df = spark.readStream.schema(schema_df).parquet("parquet/plot")

In [51]:
# write stream paquet
query_parquet = paquet_df\
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "final_best") \
    .option("checkpointLocation", "final_best") \
    .start()

bactch_id: 5, count: 1
bactch_id: 6, count: 4
bactch_id: 7, count: 6
bactch_id: 8, count: 4
+-------------------+--------------------+
|                key|               value|
+-------------------+--------------------+
|2022-10-18 04:34:30|[{"State":"RAJAST...|
|2022-10-18 04:35:00|[{"State":"RAJAST...|
+-------------------+--------------------+

bactch_id: 9, count: 13
+-------------------+--------------------+
|                key|               value|
+-------------------+--------------------+
|2022-10-18 04:35:30|[{"State":"RAJAST...|
+-------------------+--------------------+

bactch_id: 10, count: 12


In [53]:
# stop both query
query1.stop()
query_parquet.stop()