In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=071bdd471ba74281e993f2878ba8001989a94b5eb438943886f74deb1c662903
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("AdvertiseX Data Processing") \
    .getOrCreate()

# Define schema for ad impressions JSON data
impressions_schema = StructType() \
    .add("ad_creative_id", StringType()) \
    .add("user_id", StringType()) \
    .add("timestamp", StringType()) \
    .add("website", StringType())

# Read ad impressions data from Kafka topic
impressions_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "ad_impressions_topic") \
    .load()

# Convert JSON data to DataFrame
impressions_df = impressions_stream_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", impressions_schema).alias("data")) \
    .select("data.*")

# Define schema for clicks/conversions CSV data
clicks_schema = StructType() \
    .add("timestamp", StringType()) \
    .add("user_id", StringType()) \
    .add("campaign_id", StringType()) \
    .add("conversion_type", StringType())

# Read clicks/conversions data from Kafka topic
clicks_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clicks_conversions_topic") \
    .load()

# Convert CSV data to DataFrame
clicks_df = clicks_stream_df \
    .selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", clicks_schema).alias("data")) \
    .select("data.*")

# Process data (e.g., join ad impressions with clicks/conversions)
processed_data_df = impressions_df.join(clicks_df, "user_id")

# Write processed data to an output sink (e.g., console for testing)
query = processed_data_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Wait for the streaming query to terminate
query.awaitTermination()


AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.