#  This PySpark application efficiently reads streaming data from Kafka, applies multiple data transformation strategies to ensure data privacy and integrity, and writes the transformed data to Google BigQuery. The implementation demonstrates the use of various PySpark functions and UDFs to achieve the desired transformations.

In [None]:
# Loading  necessary libraries

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, sha2, concat_ws, expr, udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import random

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("KafkaToBigQuery") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,org.apache.kafka:kafka-clients:3.8.0") \
    .config("parentProject", "team-plutus-iisc") \
    .getOrCreate()

In [None]:
# Kafka topic and servers configuration
kafka_bootstrap_servers = "10.142.0.3:9092"
kafka_topic = "visit-data-topic"

In [None]:
# Schema for parsing Kafka messages (assuming JSON format)
message_schema = StructType([
    StructField("hashed_device_id", StringType(), True),
    StructField("timezone_visit", StringType(), True),
    StructField("day_of_week_visit", StringType(), True),
    StructField("time_stamp", StringType(), True),
    StructField("lat_visit", StringType(), True),
    StructField("date_visit", StringType(), True),
    StructField("time_visit", StringType(), True),
    StructField("lon_visit", StringType(), True)
])

In [None]:
# Read data from Kafka topic
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .option("kafka.security.protocol", "PLAINTEXT") \
    .option("auto.offset.reset", "earliest") \
    .load()

In [None]:
# Convert the Kafka value (binary) to a string and parse the JSON
parsed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), message_schema).alias("data")) \
    .select("data.*")  # Extract the individual fields


In [None]:

# Define a simple substitution cipher for demonstration purposes
def substitution_cipher(text):
    return ''.join(chr((ord(char) + 3) % 256) for char in text)

substitution_cipher_udf = udf(substitution_cipher, StringType())

In [None]:
# Data transformation steps
# Mask the hashed_device_id by applying a SHA-256 hash
transformed_df = parsed_df.withColumn("masked_device_id_sha256", sha2(col("hashed_device_id"), 256))

# Mask the hashed_device_id using a substitution cipher
transformed_df = transformed_df.withColumn("masked_device_id_substitution", substitution_cipher_udf(col("hashed_device_id")))

In [None]:
# Concatenate latitude and longitude into a single column
transformed_df = transformed_df.withColumn("location_concat", concat_ws(",", col("lat_visit"), col("lon_visit")))

# Obfuscate location by adding random noise
def add_noise(value):
    return value + random.uniform(-0.01, 0.01)

add_noise_udf = udf(add_noise, DoubleType())

transformed_df = transformed_df.withColumn("lat_visit_noisy", add_noise_udf(col("lat_visit").cast(DoubleType())))
transformed_df = transformed_df.withColumn("lon_visit_noisy", add_noise_udf(col("lon_visit").cast(DoubleType())))
transformed_df = transformed_df.withColumn("location_noisy", concat_ws(",", col("lat_visit_noisy"), col("lon_visit_noisy")))

In [None]:
# Drop the original latitude, longitude, and hashed_device_id columns
transformed_df = transformed_df.drop("lat_visit", "lon_visit", "hashed_device_id", "lat_visit_noisy", "lon_visit_noisy")

In [None]:
# BigQuery configuration
bigquery_table = "team-plutus-iisc.location.location_visited"

In [None]:
# Function to write micro-batch to BigQuery
def write_to_bigquery(df, epoch_id):
    df.write \
        .format("com.google.cloud.spark.bigquery.v2.Spark34BigQueryTableProvider") \
        .option("table", bigquery_table) \
        .option("temporaryGcsBucket", "gs://visited-location/data/") \
        .mode("append") \
        .save()

In [None]:
# Write to BigQuery in micro-batches
query = transformed_df \
    .writeStream \
    .foreachBatch(write_to_bigquery) \
    .option("project", "team-plutus-iisc") \
    .option("checkpointLocation", "gs://visited-location/data/") \
    .trigger(processingTime="10 seconds") \
    .start()
# Await termination
query.awaitTermination()