In [19]:
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import *
import pyspark
import json
import os
from delta.tables import *
from delta import *
import pandas as pd

In [20]:
from transaction import *
from user import *

In [21]:
spark = (SparkSession
         .builder
         .appName("firstTest")
         .config('spark.jars.packages', "io.delta:delta-core_2.12:2.3.0,io.delta:delta-storage:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0")
         .config('spark.sql.extension', 'io.delta.sql.DeltaSparkSessionExtension')
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .enableHiveSupport()
         .getOrCreate())

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

In [22]:
data = spark.read.format('csv').option('header', 'true').load("../data/dummy_bank_data.csv").withColumn('timestamp', col('timeStamp').cast(types.LongType()))

data.show(3)

In [23]:
data.write.format('delta').mode('overwrite').save("/home/bcturner/instantPaymentProcessingSystem/main/data/dummy_delta_data")

#deltaTable = DeltaTable.forPath(spark, "/home/bcturner/instantPaymentProcessingSystem/main/data/dummy_delta_data")

In [24]:
tmp = user('001', "002")
tmp.initiate_interbank_transaction()
tmp.active_transaction.set_recipient_details('00003', "00005")
tmp.active_transaction.set_dollar_value(2015)
tmp.active_transaction.generate_kafka_message()
sample_message = tmp.active_transaction.send_kafka_message()
sample_message

In [27]:
existing_data = \
    (spark.read.format('delta')
        .load("/home/bcturner/instantPaymentProcessingSystem/main/data/dummy_delta_data")
    )

existing_data.show()

In [26]:
input_stream = (
    spark
    .readStream 
    .format('kafka')
    .option('kafka.bootstrap.servers', 'localhost:9092')
    .option('subscribe', 'central_topic')
    .option('earliestOffset', 'latest')
    .load()
    .selectExpr("CAST (value AS STRING)", 
                "CAST (timestamp AS STRING)")
    #Parse Kafka Message
    .select(
        split(col('value'), ",").getItem(0).alias("sendingTimestamp"),
        split(col('value'), ",").getItem(1).alias("sendingInstitutionId"),
        split(col('value'), ",").getItem(2).alias("sendingUserId"),
        split(col('value'), ",").getItem(3).alias("receivingInstitutionId"),
        split(col('value'), ",").getItem(4).alias("receivingUserId"),
        split(col('value'), ",").getItem(5).alias("dollarValue"),
        unix_timestamp(col('timestamp')).alias('kafkaTimestamp'))
    #Convert Sending Info to Note
    .withColumn('note',
             concat(lit('Sent via IPPS from '),
                    col("sendingInstitutionId"),
                    lit(' -- '), 
                    col("sendingUserId"),
                    lit(' at '),
                    col('sendingTimestamp')
                   ).alias('sender') 
              )
    #Rename Rename ot make 
         .select(
             col('receivingInstitutionId').alias('institutionId'), 
             col("receivingUserId").alias("userId"), 
             col("dollarValue").alias('netChange'), 
             col('kafkaTimestamp').cast(types.LongType()).alias('timestamp'),
             "note"
    )
    #Append to Delta Data
    .writeStream
    .format('delta')
    .outputMode('append')
    .option('checkpointLocation', "/home/bcturner/instantPaymentProcessingSystem/main/data/dummy_delta_data")
    .start("/home/bcturner/instantPaymentProcessingSystem/main/data/dummy_delta_data")
   
)

In [None]:
+-------------+------+---------+----------+----------------+
|institutionId|userId|netChange| timestamp|            note|
+-------------+------+---------+----------+----------------+
|       000001| 00001|        0|1683256559|Initial Transfer|
|       000001| 00002|      100|1683256559|Initial Transfer|
+-------------+------+---------+----------+----------------+

In [None]:
def update_table_from_ipps(input_stream, batch_id):
    print(f'processing batchId; {batch_id}')
    
    existing_data = (
        spark.read.format('delta')
        .load("/home/bcturner/instantPaymentProcessingSystem/main/data/dummy_delta_data")
        .withColumn('recentTime', max("timestamp").over(Window.partitionBy('userId'))) 
    )
    
    (input_stream
           #Join with Existing Data to Find Current Status of Account
         .join(existing_data, col('receivingUserId')==existing_data.userId, 'inner')

           #Table Off Results to Match
         .withColumn('note',
             concat(lit('Sent via IPPS from '),
                    col("sendingInstitutionId"),
                    lit(' -- '), 
                    col("sendingUserId"),
                    lit(' at '),
                    col('sendingTimestamp')
                   ).alias('sender') 
              )
         .withColumn('value', col('value') + col('dollarValue'))
         .select(
             'institutionId', "userId", "value", col("dollarValue").alias('netChange'),
             "note", col('kafkaTimestamp').alias('timestamp')
         )
     
    )

In [None]:
columns = ['institutionId', 'userId', 'value', 'netChange', 'timestamp', 'note']
new_data = spark.createDataFrame([('000001', '00001', '3015', '1000', '1683256559.124', '')], columns)

#data.show()
final_data = data.union(new_data)

# (new_data.write
#      .format('delta')
#      .mode('append')
#      .option("mergeSchema", "true")
#      .save("/home/bcturner/instantPaymentProcessingSystem/main/data/dummy_delta_data")
# )