##### Author: Bishesh Kafle
##### Date : 2024-07-22

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os
from dotenv import load_dotenv

load_dotenv('../config/.env')

True

In [2]:
# Define Kafka topic name and bootstrap server address
kafka_topic_name = 'Topic2'
kafka_bootstrap_servers = 'localhost:9092'
KEY = os.getenv('KEY')
# Initialize a Spark session with Kafka support
spark = SparkSession \
    .builder \
    .appName("Structured Streaming") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

# Set logging level to ERROR to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")

# Read streaming data from Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic_name) \
    .option("startingOffsets", "earliest") \
    .load()

# Print the schema of the incoming Kafka messages
df.printSchema()

# Select the 'value' and 'timestamp' fields from the Kafka message and cast 'value' to string
df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")

# Define the schema of the incoming CSV data
df_schema_string = "order_id INT, encrypted_account_number STRING, branch STRING, transaction_code STRING"

# Parse the 'value' field from CSV format into individual columns based on the schema
df2 = df1 \
    .select(from_csv(col("value"), df_schema_string) \
    .alias("data"), "timestamp")

# Flatten the 'data' structure to select individual fields along with the timestamp
df3 = df2.select("data.*", "timestamp")
# Decrypt the encrypted account number column
df3 = df3.withColumn('decrypted_account_number', expr(f"CAST(aes_decrypt(unbase64(encrypted_account_number), '{KEY}', 'ECB', 'PKCS') AS STRING)".format())
)
# Create a temporary view to allow for SQL queries on the processed data
df3.createOrReplaceTempView("proc_rw_transaction_data")

# Execute an SQL query to select all data from the temporary view
data = spark.sql("SELECT * FROM proc_rw_transaction_data")

# Write the streaming query results to an in-memory table for further processing or visualization
data_agg_write_stream = data \
    .writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("append") \
    .option("truncate", "false") \
    .format("memory") \
    .queryName("temp1_stream_data") \
    .start()

# Await termination for a short period to allow the stream to start processing
data_agg_write_stream.awaitTermination(1)

24/07/24 09:31:43 WARN Utils: Your hostname, Bisheshs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.13.166.144 instead (on interface en0)
24/07/24 09:31:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/bses/.ivy2/cache
The jars for the packages stored in: /Users/bses/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-debd3d15-62d0-43ab-ab29-a769b19b752c;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/bses/F1_Intern/Kafka_Streaming/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 229ms :: artifacts dl 6ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#h

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



False

In [5]:
# Write the results of query to dataframe
df = spark.sql("SELECT decrypted_account_number, branch, transaction_code, timestamp FROM temp1_stream_data")
df.show()

+------------------------+------+----------------+--------------------+
|decrypted_account_number|branch|transaction_code|           timestamp|
+------------------------+------+----------------+--------------------+
|     02XYZXYZ10017529992|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017529992|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017517823|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017517823|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017517823|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017519116|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017519121|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017520752|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017520752|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017520752|    15|              CI|2024-07-24 09:26:...|
|     02XYZXYZ10017555515|    15|              CI|2024-07-24 09:

In [None]:
# Load the values from dataframe df to stream_data table in kafka_con database
from utils.mysql_connection import *
df_table(df.toPandas(),'kafka_con','stream_data')