In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random
import base64
from cryptography.fernet import Fernet

kafka_topic_name = "FC_Account_Master"
kafka_bootstrap_servers = 'localhost:9092'

spark = SparkSession \
        .builder \
        .appName("Structured Streaming ") \
        .master("local[*]") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "earliest") \
        .load()

df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")


df_schema_string = "order_id INT, encrypted_account_number STRING, product STRING, product_category STRING, encryption_key STRING"


def decrypt(encrypted_data, encryption_key):
  fernet = Fernet(encryption_key.encode('utf-8'))
  decrypted_data = fernet.decrypt(base64.b64decode(encrypted_data)).decode('utf-8')
  return decrypted_data

decrypt_udf = udf(decrypt, StringType())


def extract_key(message):
  return message.split(",")[-1]

extract_key_udf = udf(extract_key, StringType())

df2 = df1.select(from_csv(col("value"), df_schema_string).alias("data"), "timestamp")
df2.printSchema()

df3 = df2.withColumn("encryption_key", extract_key_udf(col("encryption_key")))
df3 = df3.withColumn("account_number", decrypt_udf("encrypted_account_number", "encryption_key"))
df3 = df3.select("data.*", "timestamp")

df3.createOrReplaceTempView("data_find");
song_find_text = spark.sql("SELECT * FROM data_find")
data_agg_write_stream = song_find_text \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("testedTable") \
        .start()

data_agg_write_stream.awaitTermination(1)


root
 |-- data: struct (nullable = true)
 |    |-- order_id: integer (nullable = true)
 |    |-- encrypted_account_number: string (nullable = true)
 |    |-- product: string (nullable = true)
 |    |-- product_category: string (nullable = true)
 |    |-- encryption_key: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `encryption_key` cannot be resolved. Did you mean one of the following? [`data`, `timestamp`].;
'Project [data#256, timestamp#244, extract_key('encryption_key)#259 AS encryption_key#260]
+- Project [from_csv(StructField(order_id,IntegerType,true), StructField(encrypted_account_number,StringType,true), StructField(product,StringType,true), StructField(product_category,StringType,true), StructField(encryption_key,StringType,true), value#253, Some(Asia/Kathmandu), None) AS data#256, timestamp#244]
   +- Project [cast(value#240 as string) AS value#253, timestamp#244]
      +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@32028d47, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7ba8ed23, [kafka.bootstrap.servers=localhost:9092, startingOffsets=earliest, subscribe=FC_Account_Master], [key#239, value#240, topic#241, partition#242, offset#243L, timestamp#244, timestampType#245], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@aeb221,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:9092, subscribe -> FC_Account_Master, startingOffsets -> earliest),None), kafka, [key#232, value#233, topic#234, partition#235, offset#236L, timestamp#237, timestampType#238]


In [2]:
df = spark.sql("SELECT * FROM testedTable")
df.show(3)

+--------+-------------------+-------+----------------+--------------------+
|order_id|     account_number|product|product_category|           timestamp|
+--------+-------------------+-------+----------------+--------------------+
|       0|02XYZXYZ10015592101|    SBA|           SBPPS|2024-05-13 12:14:...|
|       1|02XYZXYZ10015593701|    SBA|           SBPPS|2024-05-13 12:14:...|
|       2|02XYZXYZ10015593801|    SBA|           SBPPS|2024-05-13 12:14:...|
+--------+-------------------+-------+----------------+--------------------+
only showing top 3 rows



In [9]:
df_count = df.count()
df_count

246

In [8]:
df_count = df.count()
df_count

241