In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, TimestampType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("KafkaSparkStreaming") \
    .getOrCreate()

# Define schema for Kafka messages
schema = StructType() \
    .add("client_host", StringType()) \
    .add("http_method", StringType()) \
    .add("url", StringType()) \
    .add("event_time", TimestampType())

In [4]:
# Read from Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "ui-event-log") \
    .option("startingOffsets", "earliest") \
    .load()

In [6]:
# Parse the value column and apply schema
parsed_df = df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [7]:
# Example: Transform data (simple transformation)
transformed_df = parsed_df.withColumn("processed_time", col("event_time"))


In [9]:
# **Print transformed data to console**
query = transformed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Await termination to keep the stream running
query.awaitTermination()

25/03/12 14:52:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6d3ac873-0ac1-4f5c-b1bf-58018dc6fad4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/12 14:52:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/03/12 14:52:25 ERROR MicroBatchExecution: Query [id = a41381e2-d069-4c03-8316-43f30d2501bc, runId = f6ca9acd-22c3-4769-a132-0c873c815a95] terminated with error
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
	at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:643)
	at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:482)
	at org.ap

StreamingQueryException: org/apache/spark/kafka010/KafkaConfigUpdater
=== Streaming Query ===
Identifier: [id = a41381e2-d069-4c03-8316-43f30d2501bc, runId = f6ca9acd-22c3-4769-a132-0c873c815a95]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: INITIALIZING
Thread State: RUNNABLE

In [10]:
{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "import-libraries",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.functions import col, from_json\n",
    "from pyspark.sql.types import StructType, StringType, TimestampType\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "setup-environment",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set environment variables\n",
    "os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,/opt/spark/jars/kafka-clients-3.2.1.jar pyspark-shell'\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "initialize-spark-session",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Initialize Spark Session\n",
    "spark = SparkSession.builder \\\n",
    "    .appName(\"KafkaSparkStreaming\") \\\n",
    "    .getOrCreate()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "define-schema",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define schema for Kafka messages\n",
    "schema = StructType() \\\n",
    "    .add(\"client_host\", StringType()) \\\n",
    "    .add(\"http_method\", StringType()) \\\n",
    "    .add(\"url\", StringType()) \\\n",
    "    .add(\"event_time\", TimestampType())\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "read-from-kafka",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Read from Kafka\n",
    "df = spark.readStream \\\n",
    "    .format(\"kafka\") \\\n",
    "    .option(\"kafka.bootstrap.servers\", \"kafka:9092\") \\\n",
    "    .option(\"subscribe\", \"ui-event-log\") \\\n",
    "    .option(\"startingOffsets\", \"earliest\") \\\n",
    "    .load()\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "parse-transform-data",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Parse the value column and apply schema\n",
    "parsed_df = df.selectExpr(\"CAST(value AS STRING)\") \\\n",
    "    .select(from_json(col(\"value\"), schema).alias(\"data\")) \\\n",
    "    .select(\"data.*\")\n",
    "\n",
    "# Example: Transform data (simple transformation)\n",
    "transformed_df = parsed_df.withColumn(\"processed_time\", col(\"event_time\"))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "display-transformed-data",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Display transformed data to console\n",
    "query = transformed_df.writeStream \\\n",
    "    .outputMode(\"append\") \\\n",
    "    .format(\"console\") \\\n",
    "    .start()\n",
    "\n",
    "# Await termination to keep the stream running\n",
    "query.awaitTermination()\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}

{'cells': [{'cell_type': 'code',
   'execution_count': 1,
   'id': 'import-libraries',
   'metadata': {},
   'outputs': [],
   'source': ['import os\n',
    'from pyspark.sql import SparkSession\n',
    'from pyspark.sql.functions import col, from_json\n',
    'from pyspark.sql.types import StructType, StringType, TimestampType\n']},
  {'cell_type': 'code',
   'execution_count': 2,
   'id': 'setup-environment',
   'metadata': {},
   'outputs': [],
   'source': ['# Set environment variables\n',
    "os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/spark/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,/opt/spark/jars/kafka-clients-3.2.1.jar pyspark-shell'\n"]},
  {'cell_type': 'code',
   'execution_count': 3,
   'id': 'initialize-spark-session',
   'metadata': {},
   'outputs': [],
   'source': ['# Initialize Spark Session\n',
    'spark = SparkSession.builder \\\n',
    '    .appName("KafkaSparkStreaming") \\\n',
    '    .getOrCreate()\n']},
  {'cell_type': 'code',
   'execution_count': 4,
  