## 2. Spark Streaming

Once we have create our producer, we will use it to create a spark streaming to use the information from our producer to get the prediction of Top-up as the following:


**Note: all the code is derieved from the tutorial week 10-week11 with some modification**

Firstly, we will connect our notebook to kafka and import the library. Then we will make a spark config as below:

### Task 2.1 Import library and Make a Spark configuration

In [1]:
#Import all the library
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import col, decode, expr,udf
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml import PipelineModel

#Making a spark config with a time set to UTC and local to 2
conf = SparkConf().setAppName("Top-up data streaming").setMaster("local[2]")
spark = SparkSession \
    .builder \
    .config(conf = conf) \
    .config('spark.sql.session.timeZone', 'UTC') \
    .getOrCreate()

### 2.2 Ingest the streaming data into spark streaming

Then, we will make a spark streaming version of our data we have receive from our producers based on two topics as the following:

In [2]:
#Dataframe streaming of customer topic
customer_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", "Customer") \
  .load()

#Dataframe streaming of bureau topic
bureau_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
  .option("subscribe", "Bureau")\
  .load()

### 2.3 Transform our spark streaming into a proper format

After we have load our streaming received from our producer to streaming dataframe, we will make it into a proper format that match our metadata below:

First, we will select only the key and value of customer and bureau.

In [3]:
#Select the key and value of both customer and bureau streaming dataframe
customer_df = customer_stream.selectExpr("CAST(key AS STRING) AS key_customer", "CAST(value AS STRING) AS value_customer")
bureau_df = bureau_stream.selectExpr("CAST(key AS STRING) AS key_bureau", "CAST(value AS STRING) AS value_bureau")

Since the streaming data we received is turn into string format before sending, we will use `ArrayType(StructType` with list of `StructField` to make all the variable as a string type first before we change them later on as below for both customer and bureau.

In [4]:
#Assign the Schema based on the metadata of customer data
Customer_schema = ArrayType(StructType([StructField('ID', StringType(), True), 
    StructField('Frequency', StringType(), True),
    StructField('InstlmentMode', StringType(), True),
    StructField('LoanStatus', StringType(), True),
    StructField('PaymentMode', StringType(), True),
    StructField('BranchID', StringType(), True),
    StructField('Area', StringType(), True),
    StructField('Tenure', StringType(), True), #Int
    StructField('AssetCost', StringType(), True), #Int
    StructField('AmountFinance', StringType(), True), #INT
    StructField('DisbursalAmount', StringType(), True), #INT
    StructField('EMI', StringType(), True), #INT
    StructField('DisbursalDate', StringType(), True),
    StructField('MaturityDAte', StringType(), True),
    StructField('AuthDate', StringType(), True),
    StructField('AssetID', StringType(), True),
    StructField('ManufacturerID', StringType(), True),
    StructField('SupplierID', StringType(), True),
    StructField('LTV', StringType(), True), #INT
    StructField('SEX', StringType(), True),
    StructField('AGE', StringType(), True), #INT
    StructField('MonthlyIncome', StringType(), True), #INT
    StructField('City', StringType(), True),
    StructField('State', StringType(), True),
    StructField('ZiPCODE', StringType(), True),
    StructField('Top-up Month', StringType(), True),
    StructField('ts', TimestampType(), True)]))

Before we cast each variable into it true datatype, we need to unnest the data by using `select` with `from_json` then apply `.explode`. With those command, we will get the data to be unnested.

In [5]:
#Unnest the column from json form to unnest version of variable
customer_df=customer_df.select(F.from_json(F.col("value_customer").cast("string"), Customer_schema).alias('parsed_value'))
customer_df = customer_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))
customer_df = customer_df.select(F.col('unnested_value.ID').alias('ID'),
                                F.col('unnested_value.Frequency').alias('Frequency'),
                                F.col('unnested_value.InstlmentMode').alias('InstlmentMode'),
                                F.col('unnested_value.LoanStatus').alias('LoanStatus'),
                                F.col('unnested_value.PaymentMode').alias('PaymentMode'),
                                F.col('unnested_value.BranchID').alias('BranchID'),
                                F.col('unnested_value.Area').alias('Area'),
                                F.col('unnested_value.Tenure').alias('Tenure'),
                                F.col('unnested_value.AssetCost').alias('AssetCost'),
                                F.col('unnested_value.AmountFinance').alias('AmountFinance'),
                                F.col('unnested_value.DisbursalAmount').alias('DisbursalAmount'),
                                F.col('unnested_value.EMI').alias('EMI'),
                                F.col('unnested_value.DisbursalDate').alias('DisbursalDate'),
                                F.col('unnested_value.MaturityDAte').alias('MaturityDAte'),
                                F.col('unnested_value.AuthDate').alias('AuthDate'),
                                F.col('unnested_value.AssetID').alias('AssetID'),
                                F.col('unnested_value.ManufacturerID').alias('ManufacturerID'),
                                F.col('unnested_value.SupplierID').alias('SupplierID'),
                                F.col('unnested_value.LTV').alias('LTV'),
                                F.col('unnested_value.SEX').alias('SEX'),
                                F.col('unnested_value.AGE').alias('AGE'),
                                F.col('unnested_value.MonthlyIncome').alias('MonthlyIncome'),
                                F.col('unnested_value.City').alias('City'),
                                F.col('unnested_value.State').alias('State'),
                                F.col('unnested_value.ZiPCODE').alias('ZiPCODE'),
                                F.col('unnested_value.Top-up Month').alias('Top-up Month'),
                                F.col('unnested_value.ts').alias('ts'))

After that, we will change the data type based on our metadata. The process will be the same.

In [6]:
#Cast the column datatype based on the metadata information
customer_df = customer_df.withColumn('AssetCost',col('AssetCost').cast('Double'))\
              .withColumn('AmountFinance',col('AmountFinance').cast('Double'))\
              .withColumn('DisbursalAmount',col('DisbursalAmount').cast('Double'))\
              .withColumn('EMI',col('EMI').cast('Double'))\
              .withColumn('LTV',col('LTV').cast('Float'))\
              .withColumn('AGE',col('AGE').cast('Integer'))\
              .withColumn('MonthlyIncome',col('MonthlyIncome').cast('Float'))

In [7]:
#Assign the Schema 
bureau_schema = ArrayType(StructType([StructField('ID', StringType(), True), 
    StructField('SELF-INDICATOR', StringType(), True),
    StructField('MATCH-TYPE', StringType(), True),
    StructField('ACCT-TYPE', StringType(), True),
    StructField('CONTRIBUTOR-TYPE', StringType(), True),
    StructField('DATE-REPORTED', StringType(), True),
    StructField('OWNERSHIP-IND', StringType(), True),
    StructField('ACCOUNT-STATUS', StringType(), True),
    StructField('DISBURSED-DT', StringType(), True),
    StructField('CLOSE-DT', StringType(), True),
    StructField('LAST-PAYMENT-DATE', StringType(), True), 
    StructField('CREDIT-LIMIT/SANC AMT', StringType(), True), #Int
    StructField('DISBURSED-AMT/HIGH CREDIT', StringType(), True), #Int
    StructField('INSTALLMENT-AMT', StringType(), True), #Int
    StructField('CURRENT-BAL', StringType(), True), #Int
    StructField('INSTALLMENT-FREQUENCY', StringType(), True),
    StructField('OVERDUE-AMT', StringType(), True), #Int
    StructField('WRITE-OFF-AMT', StringType(), True), #Int
    StructField('ASSET_CLASS', StringType(), True),
    StructField('REPORTED DATE - HIST', StringType(), True),
    StructField('DPD - HIST', StringType(), True),
    StructField('CUR BAL - HIST', StringType(), True),
    StructField('AMT OVERDUE - HIST', StringType(), True),
    StructField('AMT PAID - HIST', StringType(), True),
    StructField('TENURE', StringType(), True), #int
    StructField('ts', TimestampType(), True)]))

In [8]:
#Unnest the column from json form to unnest version of variable
bureau_df=bureau_df.select(F.from_json(F.col("value_bureau").cast("string"), bureau_schema).alias('parsed_value'))
bureau_df = bureau_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))
bureau_df = bureau_df.select(F.col('unnested_value.ID').alias('ID'),
                                F.col('unnested_value.SELF-INDICATOR').alias('SELF-INDICATOR'),
                                F.col('unnested_value.MATCH-TYPE').alias('MATCH-TYPE'),
                                F.col('unnested_value.ACCT-TYPE').alias('ACCT-TYPE'),
                                F.col('unnested_value.CONTRIBUTOR-TYPE').alias('CONTRIBUTOR-TYPE'),
                                F.col('unnested_value.DATE-REPORTED').alias('DATE-REPORTED'),
                                F.col('unnested_value.OWNERSHIP-IND').alias('OWNERSHIP-IND'),
                                F.col('unnested_value.ACCOUNT-STATUS').alias('ACCOUNT-STATUS'),
                                F.col('unnested_value.DISBURSED-DT').alias('DISBURSED-DT'),
                                F.col('unnested_value.CLOSE-DT').alias('CLOSE-DT'),
                                F.col('unnested_value.LAST-PAYMENT-DATE').alias('LAST-PAYMENT-DATE'),
                                F.col('unnested_value.CREDIT-LIMIT/SANC AMT').alias('CREDIT-LIMIT/SANC AMT'),
                                F.col('unnested_value.DISBURSED-AMT/HIGH CREDIT').alias('DISBURSED-AMT/HIGH CREDIT'),
                                F.col('unnested_value.INSTALLMENT-AMT').alias('INSTALLMENT-AMT'),
                                F.col('unnested_value.CURRENT-BAL').alias('CURRENT-BAL'),
                                F.col('unnested_value.INSTALLMENT-FREQUENCY').alias('INSTALLMENT-FREQUENCY'),
                                F.col('unnested_value.OVERDUE-AMT').alias('OVERDUE-AMT'),
                                F.col('unnested_value.WRITE-OFF-AMT').alias('WRITE-OFF-AMT'),
                                F.col('unnested_value.ASSET_CLASS').alias('ASSET_CLASS'),
                                F.col('unnested_value.REPORTED DATE - HIST').alias('REPORTED DATE - HIST'),
                                F.col('unnested_value.DPD - HIST').alias('DPD - HIST'),
                                F.col('unnested_value.CUR BAL - HIST').alias('CUR BAL - HIST'),
                                F.col('unnested_value.AMT OVERDUE - HIST').alias('AMT OVERDUE - HIST'),
                                F.col('unnested_value.AMT PAID - HIST').alias('AMT PAID - HIST'),
                                F.col('unnested_value.TENURE').alias('TENURE'),
                                F.col('unnested_value.ts').alias('ts'))

The only difference that occur for bureau dataframe is that some information might need to be manipulate in order to make their form correct. We can do this by remove some comma or postfix from these variable value as the follow:

In [9]:
#Format change column
format_change = ['CREDIT-LIMIT/SANC AMT', 'DISBURSED-AMT/HIGH CREDIT', 'INSTALLMENT-AMT', 'CURRENT-BAL', 'OVERDUE-AMT']
for cols in format_change:
    #Remove comma
    bureau_df = bureau_df.withColumn(cols,F.udf(lambda x: x.replace(',','') if x is not None
                                               else None)(F.col(cols)))
                                        
#Change the value of 'INSTALLMENT-AMT' column (NNK,2022c)
bureau_df=bureau_df.withColumn('INSTALLMENT-AMT',F.regexp_replace(F.col('INSTALLMENT-AMT'),"[\$#,A-Za-z/]", ""))

In [10]:
#Cast the datatype to the correct one based on the metadata
bureau_df = bureau_df.withColumn('CREDIT-LIMIT/SANC AMT',col('CREDIT-LIMIT/SANC AMT').cast('Double'))\
              .withColumn('DISBURSED-AMT/HIGH CREDIT',col('DISBURSED-AMT/HIGH CREDIT').cast('Double'))\
              .withColumn('INSTALLMENT-AMT',col('INSTALLMENT-AMT').cast('Double'))\
              .withColumn('CURRENT-BAL',col('CURRENT-BAL').cast('Double'))\
              .withColumn('OVERDUE-AMT',col('OVERDUE-AMT').cast('Double'))\
              .withColumn('WRITE-OFF-AMT',col('WRITE-OFF-AMT').cast('Double'))\
              .withColumn('TENURE',col('TENURE').cast('Integer'))

Once we finish formating our dataframe now we will apply the watermark to them by using `.withWatermark` to both dataframe and setting them by using the `ts` variable with 5 seconds.

In [11]:
#Set the watermark to both bureau and customer dataframe
customer_df = customer_df.withWatermark("ts", "5 seconds")
bureau_df = bureau_df.withWatermark("ts", "5 seconds")

### 2.4 Group the bureau stream based on ID with 30 seconds window duration.

In this section, ther are two parts as the following:
1. Change the `self-indicator` format to 1 and 0 for True and False value by using udf function and change it data type to match the value
2. Grouping the bureau data based on ID. Then, setting window duration to 30 seconds when joing the data. This can be done by  combining of `groupBy` and `window` function on `ts` with 30 seconds. After that we will use aggregation for each variable based on their data type similar to task 2A. below:

In [12]:
#Group the bureau dataframe with the same rule as the last
def change_boolean(s):
    check = 0
    if s == 'True':
        check = 1
    else:
        check = 0
    return check

#Apply the udf to self-indicator and change the datatype based on the value
bool_udf = udf(change_boolean,IntegerType())

bureau_df = bureau_df.withColumn("SELF-INDICATOR",bool_udf("SELF-INDICATOR"))

In [13]:
#Identify which column is numeric and string so that we can group it in the next part
numeric_column = [field.name for field in bureau_df.schema.fields if str(field.dataType) != 'StringType' 
                 if str(field.dataType) != 'TimestampType']
string_column = [col for col in bureau_df.columns if col not in numeric_column if col != 'ID' if col != 'ts']

In [14]:
#Group bureau dataframe if the numeric column we sum it if it string we count distinct
bureau_new_df = bureau_df.groupBy(window(bureau_df.ts, "30 seconds"),"ID")\
        .agg(F.sum(numeric_column[0]).alias(numeric_column[0]+'_sum'),
            F.sum(numeric_column[1]).alias(numeric_column[1]+'_sum'),
            F.sum(numeric_column[2]).alias(numeric_column[2]+'_sum'),
            F.sum(numeric_column[3]).alias(numeric_column[3]+'_sum'),
            F.sum(numeric_column[4]).alias(numeric_column[4]+'_sum'),
            F.sum(numeric_column[5]).alias(numeric_column[5]+'_sum'),
            F.sum(numeric_column[6]).alias(numeric_column[6]+'_sum'),
            F.sum(numeric_column[7]).alias(numeric_column[7]+'_sum'),
            F.approx_count_distinct(string_column[0]).alias(string_column[0]+'_dist'),
            F.approx_count_distinct(string_column[1]).alias(string_column[1]+'_dist'),
            F.approx_count_distinct(string_column[2]).alias(string_column[2]+'_dist'),
            F.approx_count_distinct(string_column[3]).alias(string_column[3]+'_dist'),
            F.approx_count_distinct(string_column[4]).alias(string_column[4]+'_dist'),
            F.approx_count_distinct(string_column[5]).alias(string_column[5]+'_dist'),
            F.approx_count_distinct(string_column[6]).alias(string_column[6]+'_dist'),
            F.approx_count_distinct(string_column[7]).alias(string_column[7]+'_dist'),
            F.approx_count_distinct(string_column[8]).alias(string_column[8]+'_dist'),
            F.approx_count_distinct(string_column[9]).alias(string_column[9]+'_dist'),
            F.approx_count_distinct(string_column[10]).alias(string_column[10]+'_dist'),
            F.approx_count_distinct(string_column[11]).alias(string_column[11]+'_dist'),
            F.approx_count_distinct(string_column[12]).alias(string_column[12]+'_dist'),
            F.approx_count_distinct(string_column[13]).alias(string_column[13]+'_dist'),
            F.approx_count_distinct(string_column[14]).alias(string_column[14]+'_dist'),
            F.approx_count_distinct(string_column[15]).alias(string_column[15]+'_dist'))

### 2.5 Join the two streaming dataframes based on window duration

After we have finish grouping, we have to create new variables called 'window_start' and 'window_end' based on the start and the end of window duration variable below before joing two dataframes below:

In [15]:
#Create window_start and window_end column
bureau_new_df = bureau_new_df.withColumn('window_start',bureau_new_df.window.start)\
                .withColumn('window_end',bureau_new_df.window.end)

In [16]:
#Check whether we succesfully create the variable by using printSchema
bureau_new_df.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- ID: string (nullable = true)
 |-- SELF-INDICATOR_sum: long (nullable = true)
 |-- CREDIT-LIMIT/SANC AMT_sum: double (nullable = true)
 |-- DISBURSED-AMT/HIGH CREDIT_sum: double (nullable = true)
 |-- INSTALLMENT-AMT_sum: double (nullable = true)
 |-- CURRENT-BAL_sum: double (nullable = true)
 |-- OVERDUE-AMT_sum: double (nullable = true)
 |-- WRITE-OFF-AMT_sum: double (nullable = true)
 |-- TENURE_sum: long (nullable = true)
 |-- MATCH-TYPE_dist: long (nullable = false)
 |-- ACCT-TYPE_dist: long (nullable = false)
 |-- CONTRIBUTOR-TYPE_dist: long (nullable = false)
 |-- DATE-REPORTED_dist: long (nullable = false)
 |-- OWNERSHIP-IND_dist: long (nullable = false)
 |-- ACCOUNT-STATUS_dist: long (nullable = false)
 |-- DISBURSED-DT_dist: long (nullable = false)
 |-- CLOSE-DT_dist: long (nullable = false)
 |-- LAST-PAYMENT-DATE_dist: long (nullable = false

After that, we will change the name of the `ID` variable from both customer and bureau dataframes. Next, we will join them based on the `ID` and `window_start` and `window_end` if the `ts` of customer is between both window start and window end time and have the same ID a bureau then join.

In [17]:
#Change customer and new bureau dataframe ID so it can be join
customer_df = customer_df.withColumnRenamed('ID','customer_ID')
bureau_new_df = bureau_new_df.withColumnRenamed('ID','bureau_ID')

In [18]:
#Join two dataframe based on the described above condition
merge_df = customer_df.join(bureau_new_df,expr("""customer_ID = bureau_ID AND ts>=window_start AND ts<=window_end"""),"inner")

In [19]:
#Rename ID after joining
merge_df = merge_df.withColumnRenamed('customer_ID','ID')

In [20]:
merge_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Frequency: string (nullable = true)
 |-- InstlmentMode: string (nullable = true)
 |-- LoanStatus: string (nullable = true)
 |-- PaymentMode: string (nullable = true)
 |-- BranchID: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Tenure: string (nullable = true)
 |-- AssetCost: double (nullable = true)
 |-- AmountFinance: double (nullable = true)
 |-- DisbursalAmount: double (nullable = true)
 |-- EMI: double (nullable = true)
 |-- DisbursalDate: string (nullable = true)
 |-- MaturityDAte: string (nullable = true)
 |-- AuthDate: string (nullable = true)
 |-- AssetID: string (nullable = true)
 |-- ManufacturerID: string (nullable = true)
 |-- SupplierID: string (nullable = true)
 |-- LTV: float (nullable = true)
 |-- SEX: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- MonthlyIncome: float (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- ZiPCODE: string (nullable

### 2.6 Store our new join dataframe in a paquet format

Once we have done joining two dataframes, we will select these columns: `ID`,`window_start`,`window_end`,`ts`,`Top-up Month`. Then we will rename the `Top-up month` to `Top-up_Month`before we parquet the file and store in some location as the following:

In [21]:
#Select the column based on instruction and rename the top-up month
df_format1 = merge_df.select('ID','window_start','window_end','ts','Top-up Month')
df_format1 = df_format1.withColumnRenamed('Top-up Month','Top-up_Month')

In [23]:
#parquet form output for 2.5 (wait for it to save the data for 1-2 minutes)
query_file_sink = df_format1.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/task2B_2.6")\
        .option("checkpointLocation", "parquet/task2B_2.6/checkpoint")\
        .start()

In [24]:
#Stop the storing 
query_file_sink.stop()

When we stop the query for creating parquet, we will show the result to make sure that we did not get an empty result.

In [25]:
#Read the parquet file in the location and show it
query_file_sink_df = spark.read.parquet("parquet/task2B_2.6")
query_file_sink_df.printSchema()
query_file_sink_df.show()

root
 |-- ID: string (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- Top-up_Month: string (nullable = true)

+---+-------------------+-------------------+-------------------+-----------------+
| ID|       window_start|         window_end|                 ts|     Top-up_Month|
+---+-------------------+-------------------+-------------------+-----------------+
|244|2022-10-18 02:17:00|2022-10-18 02:17:30|2022-10-18 02:17:10|No Top-up Service|
|246|2022-10-18 02:17:00|2022-10-18 02:17:30|2022-10-18 02:17:10|No Top-up Service|
|229|2022-10-18 02:17:00|2022-10-18 02:17:30|2022-10-18 02:17:10|No Top-up Service|
|171|2022-10-18 02:16:30|2022-10-18 02:17:00|2022-10-18 02:16:54|No Top-up Service|
|183|2022-10-18 02:16:30|2022-10-18 02:17:00|2022-10-18 02:16:54|No Top-up Service|
|164|2022-10-18 02:16:30|2022-10-18 02:17:00|2022-10-18 02:16:45|No Top-up Service|
|147|2022-10-18 02:16:30|2022-1

After that we will remove the directory that store the parquet so that when we run the notebook again we will not get an error.

**Note: This step will be repeat in the next two parts as well**

In [26]:
#Remove the path for task 2.6 if people want a new rerun
home_path = os.getcwd()

import shutil
def remove_folder(path):
    # check if folder exists
    if os.path.exists(path):
         # remove if exists
         shutil.rmtree(path)
    
remove_folder(home_path+"/parquet/task2B_2.6")

### 2.7  Import machine learning models and make a prediction

In the following, we will import our machine learning model using `PipelineModel` to load our model in the directory and we will `setHandleInvalid` to be keep in all of the steps to ignore the null on the dataframe. Once we have done that, we will transform our join dataframe using `.transform`.

In [27]:
# task 2.7 making a prediction using our ML pipeline model
model = PipelineModel.load('topup_pipeline_model')
model.stages[-2].setHandleInvalid("keep") 
model.stages[-3].setHandleInvalid("keep")
model.stages[-4].setHandleInvalid("keep")
model_pred = model.transform(merge_df)

After we have apply the transformation and getting a prediction from our machine learning model, will repeat the same steps as the previous section but this time we will put select the `prediction` variable as well.

In [28]:
#Change the top-up month name and select the variable for showing
model_pred = model_pred.withColumnRenamed('Top-up Month','Top-up_Month')
df_pred =  model_pred.select('ID','window_start','window_end','ts','prediction','Top-up_Month')

In [29]:
#parquet form output for 2.7 (wait for the  2 minutes to 3 minutes to see the result in the next code chunk)
query_file_sink2 = df_pred\
        .writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/task2B_2.7")\
        .option("checkpointLocation", "parquet/task2B_2.7/checkpoint")\
        .start()

In [30]:
#Stop the query
query_file_sink2.stop()

In [31]:
#Show our result
query_file_sink_df2 = spark.read.parquet("parquet/task2B_2.7")
query_file_sink_df2.printSchema()
query_file_sink_df2.show()

root
 |-- ID: string (nullable = true)
 |-- window_start: timestamp (nullable = true)
 |-- window_end: timestamp (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- prediction: double (nullable = false)
 |-- Top-up_Month: string (nullable = true)

+---+-------------------+-------------------+-------------------+----------+-----------------+
| ID|       window_start|         window_end|                 ts|prediction|     Top-up_Month|
+---+-------------------+-------------------+-------------------+----------+-----------------+
|771|2022-10-18 02:19:30|2022-10-18 02:20:00|2022-10-18 02:19:58|       0.0|No Top-up Service|
|760|2022-10-18 02:19:30|2022-10-18 02:20:00|2022-10-18 02:19:58|       0.0|No Top-up Service|
|707|2022-10-18 02:19:30|2022-10-18 02:20:00|2022-10-18 02:19:41|       1.0|No Top-up Service|
|717|2022-10-18 02:19:30|2022-10-18 02:20:00|2022-10-18 02:19:41|       1.0|No Top-up Service|
|740|2022-10-18 02:19:30|2022-10-18 02:20:00|2022-10-18 02:19:49|       0.0|No 

In [32]:
#Remove the path way for task 2.7
remove_folder(home_path+"/parquet/task2B_2.7")

### 2.8 Get count for top-up by each States 

This is the last section for this task, it is beginning with filtering our dataframe that have a `prediction` result. The result we want is the `prediction` which is equal to `1` as those who will top-up. Then, we select two columns which is `window_end` and `State` and rename them as `key` and `value` below:

In [33]:
#Filtering that result of prediction to 1 and select our target variable.
model_state = model_pred.filter(col('prediction')==1)
model_state = model_state.withColumnRenamed('window_end','key')
model_state = model_state.select('key','State')
model_state = model_state.withColumnRenamed('State','value')

Once we get that, we will create a foreachbatch function which will handling the multiple aggreagation task and store our aggregated dataframe as a parquet to be used for sending to kafka consumer as below:

In [69]:
import datetime as dt
from datetime import datetime

# Task 2.8 Create function to show values received from input dataframe
def foreach_batch_function(df, epoch_id):
    """
    In this function, we only need dataframe a parameter and we will aggregate the dataframe
    by changing it form by apply a groupby function and change the form of value to json format
    consiste of States and count of each states as a value while the key is the window end time stamp
    """
    print(f"epoch_id: {epoch_id}; count:{df.count()}; now: {dt.datetime.now().strftime('%X')}")
    if df.count() > 0:
        df = df.groupBy("key","value").agg(F.count('value').alias('count'))
        df = df.withColumnRenamed('value','State')
        df = df.select("key",to_json(struct("State","count")).alias("json")) #(wffzxyl,2019)
        df = df.groupBy("key").agg(F.collect_set(F.col("json")).alias('value')) #(DBA108642,2020)
        df.select("key","value").write.format("parquet")\
        .mode('append')\
        .option("path", "parquet/task2B_2.8")\
        .save()

After we get our function, we will apply the `writestream` with our `foreachbatch` function to start store our result as a parquet.

In [70]:
#Wait until the count of dataframe have any number so you can put run the next chunk
query1 = model_state.writeStream.outputMode("append")\
        .foreachBatch(foreach_batch_function)\
        .trigger(processingTime='20 seconds')\
        .start()

epoch_id: 0; count:0; now: 14:18:40


Once we have created a parquet, we will start transfer it to consumer by using a while loop of write and save to kafka with the read parquet we created from previous code. The loop need to be run and we need to interupt it to stop it. Lastly same as previous two sections, we will stop query and delete the directory so we can reuse this in the future.

In [71]:
#Read our parquest created by the previous code
final_query = spark.read.parquet("parquet/task2B_2.8")

In [72]:
#Run this to transfer our panquet to consumer side (don't run the next code chunk until we satisfied)
while True:
    final_query.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")\
        .write.mode("append")\
        .format("kafka")\
        .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
        .option("topic", "Top_up_count_state") \
        .option("checkpointLocation",home_path+"/kafka/checkpoint")\
        .save()

epoch_id: 1; count:0; now: 14:19:05
epoch_id: 2; count:6; now: 14:19:34
epoch_id: 3; count:22; now: 14:20:17
epoch_id: 4; count:29; now: 14:20:55
epoch_id: 5; count:29; now: 14:21:40


KeyboardInterrupt: 

In [73]:
query1.stop()

In [74]:
#Remove the directory for reuse the code in the future
remove_folder(home_path+"/parquet/task2B_2.8")
remove_folder(home_path+"/kafka/checkpoint")

## References

- DBA108642. (2020, Febuary 27). $\textit{Pyspark merge multiple columns into a json column}$. https://stackoverflow.com/questions/60435907/pyspark-merge-multiple-columns-into-a-json-column?fbclid=IwAR2LQtmt97HHogT_d95vZzCm36TXo3XqJaYR7imaiswpJEbMT8OP9cSelLg
- Jupyter Notebooks:FIT 5202 Data Processing in Big Data (2022). $\textit{Week 10  Clickstream Spark Streaming - Handling Json Array DEMO}$.https://lms.monash.edu/mod/resource/view.php?id=10523282
- Jupyter Notebooks:FIT 5202 Data Processing in Big Data (2022). $\textit{Week 10 Lab-Task Log Analysis [V 1.1]}$.https://lms.monash.edu/mod/resource/view.php?id=10523278
- Jupyter Notebooks:FIT 5202 Data Processing in Big Data (2022). $\textit{Week 10  Spark Stream Join Example]}$.https://lms.monash.edu/mod/resource/view.php?id=10523281
- Jupyter Notebooks:FIT 5202 Data Processing in Big Data (2022). $\textit{Week 11  Spark Streaming Watermarking DEMO [V 1.1]}$. https://lms.monash.edu/mod/resource/view.php?id=10523304
- NNK. (2022, August 5). $\textit{PySpark Replace Column Values in DataFrame}$. https://sparkbyexamples.com/pyspark/pyspark-replace-column-values/
- $\textit{Structured Streaming Programming Guide}$.(nd.). https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- $\textit{Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)}$. (nd.). https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
- wffzxyl. (2019, July 19). $\textit{How to convert some pyspark dataframe's column into a dict with its column name and combine them to be a json column?}$. https://stackoverflow.com/questions/57112873/how-to-convert-some-pyspark-dataframes-column-into-a-dict-with-its-column-name