In [1]:
import sys
sys.path.append("..")  # Add parent directory to sys.path for module imports
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StringType, StructType, StructField, LongType, ArrayType, MapType
from config import load_config
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, ArrayType, MapType
import pandas as pd
import hashlib
from utils import extract_browser, extract_os

In [2]:
kafka_conf = load_config(filename='../config.ini',section = 'remote_kafka')

In [3]:
spark = SparkSession.builder \
    .appName("SimpleApp") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()

25/09/21 11:57:04 WARN Utils: Your hostname, sonhaile-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.101.43 instead (on interface enp5s0)
25/09/21 11:57:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/sonhaile/.ivy2/cache
The jars for the packages stored in: /home/sonhaile/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6e9d4bc1-78be-4c69-9aa6-94e12ac57ce2;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 221ms :: artifacts dl 4ms


25/09/21 11:57:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
def transform(df):
    """Transform raw data by adding computed columns."""
    
    # Convert timestamp to datetime (assuming seconds since epoch)
    df['timestamp_dt'] = pd.to_datetime(df['time_stamp'], unit='s')
    
    # Create date and time columns
    df['full_date'] = df['timestamp_dt'].dt.strftime('%Y-%m-%d')
    df['full_time'] = df['timestamp_dt'].dt.strftime('%H:%M:%S')
    
    # Create hash keys
    df['sales_key'] = df.apply(lambda row: hashlib.sha256(f"{row['id']}{row['product_id']}".encode()).hexdigest(), axis=1)
    df['ip_key'] = df['ip'].apply(lambda x: hashlib.sha256(x.encode()).hexdigest() if pd.notna(x) else None)
    df['user_agent_key'] = df['user_agent'].apply(lambda x: hashlib.sha256(x.encode()).hexdigest() if pd.notna(x) else None)
    df['product_key'] = df['product_id']
    
    # Extract browser and OS from user_agent
    df['browser'] = df['user_agent'].apply(extract_browser)
    df['os'] = df['user_agent'].apply(extract_os)
    
    # Convert option array to string
    df['option'] = df['option'].apply(lambda x: str(x) if x is not None else None)
    
    return df

In [5]:
from pyspark.sql.streaming import StreamingQuery

# Alternative approach using foreachBatch for better compatibility
def process_batch(df, epoch_id):
    """Process each micro-batch of data."""
    if df.count() > 0:
        # Convert to pandas DataFrame
        pandas_df = df.toPandas()
        
        # Apply transform function
        transformed_df = transform(pandas_df)
        
        # Print transformed data (for testing)
        print(f"Batch {epoch_id} - Transformed Data Sample:")
        print(transformed_df.head(3))
        print(f"Total records in batch: {len(transformed_df)}")
        print("---")

# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .options(**kafka_conf) \
    .load()

schema = StructType([
    StructField("_id", StringType()),
    StructField("time_stamp", LongType()),
    StructField("ip", StringType()),
    StructField("user_agent", StringType()),
    StructField("resolution", StringType()),
    StructField("user_id_db", StringType()),
    StructField("device_id", StringType()),
    StructField("api_version", StringType()),
    StructField("store_id", StringType()),
    StructField("local_time", StringType()),
    StructField("show_recommendation", StringType()),
    StructField("current_url", StringType()),
    StructField("referrer_url", StringType()),
    StructField("email_address", StringType()),
    StructField("recommendation", StringType()),
    StructField("utm_source", StringType()),
    StructField("utm_medium", StringType()),
    StructField("collection", StringType()),
    StructField("product_id", StringType()),
    StructField("option", ArrayType(MapType(StringType(), StringType()))),
    StructField("id", StringType())
])

# Parse JSON data
parsed_df = df.selectExpr("CAST(value AS STRING) as json_value") \
    .select(from_json(col("json_value"), schema).alias("data")) \
    .select("data.*")

# Apply transform function via foreachBatch (more compatible)
query = parsed_df.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", "/tmp/spark_checkpoints/test_notebook") \
    .trigger(processingTime="2 seconds") \
    .start() # processingTime="2 seconds" OR once = True 

print("Streaming pipeline with transform started. Press Ctrl+C to stop.")
query.awaitTermination()

25/09/21 11:57:07 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Streaming pipeline with transform started. Press Ctrl+C to stop.


                                                                                

Batch 306 - Transformed Data Sample:
                        _id  time_stamp               ip  \
0  5ea2655233eacf36f489fe81  1758374260  132.147.109.139   
1  5ea26552feae1f3778c93ef7  1758374260    49.197.93.179   
2  5ea2655234103036e2461df8  1758374260  190.107.228.218   

                                          user_agent resolution user_id_db  \
0  Mozilla/5.0 (Linux; Android 10; SM-G985F) Appl...    385x854              
1  Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like...    414x736              
2  Mozilla/5.0 (Linux; Android 8.0.0; SM-G935F) A...    360x640              

                              device_id api_version store_id  \
0  044169b1-f41e-4948-995c-e7cdf4e60d0a         1.0       37   
1  70ec1df8-7028-4815-ba44-848106af5340         1.0       29   
2  3001c973-8120-4e2a-a308-a307221f4d48         1.0       85   

            local_time  ...                                    id  \
0  2025-09-20 20:17:40  ...  a510b086-b106-4d5f-b555-4d8fe7386952   
1  2025-09-20 

ERROR:root:KeyboardInterrupt while sending command.                 (0 + 3) / 3]
Traceback (most recent call last):
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/socket.py", line 716, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/py4j

KeyboardInterrupt: 

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/pyspark/sql/utils.py", line 276, in call
    raise e
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/pyspark/sql/utils.py", line 273, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/tmp/ipykernel_1161468/1949673041.py", line 6, in process_batch
    if df.count() > 0:
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 804, in count
    return int(self._jdf.count())
  File "/home/sonhaile/miniconda3/envs/data_engineering/lib/python3.9/site-packages/py4j

25/09/21 12:02:18 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.util.concurrent.TimeoutException: Cannot fetch record for offset 3779584 in 120000 milliseconds
	at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.fetch(KafkaDataConsumer.scala:97)
	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$fetchData$1(KafkaDataConsumer.scala:579)
	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.timeNanos(KafkaDataConsumer.scala:666)
	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.fetchData(KafkaDataConsumer.scala:579)
	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.fetchRecord(KafkaDataConsumer.scala:512)
	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:323)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
	at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsume

[Stage 3:>                                                          (0 + 2) / 3]

25/09/21 12:02:28 WARN TaskSetManager: Lost task 2.0 in stage 3.0 (TID 9) (192.168.101.43 executor driver): TaskKilled (Stage cancelled)


[Stage 3:>                                                          (0 + 1) / 3]

25/09/21 12:02:31 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 8) (192.168.101.43 executor driver): TaskKilled (Stage cancelled)
