In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

import time

kafka_topic_name = "Topic1"
kafka_bootstrap_servers = 'localhost:9092'

spark = SparkSession \
        .builder \
        .appName("Structured Streaming ") \
        .master("local[*]") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [3]:
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "earliest") \
        .load()

df1 = df.selectExpr("CAST(value AS STRING)", "timestamp") # casts the column named "value" to a string data type.
df1.printSchema()





root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [4]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [5]:
transaction_schema = "encrypted_Account_No STRING,`BALANCE AMT` DOUBLE"
df2 = df1\
        .select(from_csv(col("value"), transaction_schema)\
                .alias("transaction"), "timestamp")

df2.printSchema()

root
 |-- transaction: struct (nullable = true)
 |    |-- encrypted_Account_No: string (nullable = true)
 |    |-- BALANCE AMT: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [6]:
df3 = df2.select("transaction.*", "timestamp")
df3.printSchema()

root
 |-- encrypted_Account_No: string (nullable = true)
 |-- BALANCE AMT: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [7]:
df4 = df3.withColumn('account_number',
                               expr("trim(aes_decrypt(unbase64(encrypted_Account_No), '1234567890abcdef', 'ECB', 'PKCS'))")).select('account_number','BALANCE AMT')
  
df4.createOrReplaceTempView("transaction_find");
final_transaction = spark.sql("SELECT * FROM transaction_find")
Transaction_agg_write_stream = final_transaction \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("Transaction_Table") \
        .start()
Transaction_agg_write_stream.awaitTermination(1)

False

In [9]:
df = spark.sql("SELECT * FROM Transaction_Table")
df.show(10)

+--------------+-----------+
|account_number|BALANCE AMT|
+--------------+-----------+
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
| 409000611074'|       NULL|
+--------------+-----------+
only showing top 10 rows

