In [1]:
from confluent_kafka import Producer
# from kafka import Producer

import requests
import json

class WeatherKafkaProducer:
    """
    Kafka Producer: Fetches weather data from OpenWeather API and publishes to Kafka.
    """
    def __init__(self, kafka_broker, topic, api_key, city):
        self.topic = topic
        self.api_url = f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}"
        self.producer = Producer({'bootstrap.servers': kafka_broker})

    def fetch_weather_data(self):
        """
        Fetch weather data from OpenWeather API.
        """
        try:
            # Step 1: Send GET request to the OpenWeather API
            response = requests.get(self.api_url)
            # Step 2: Raise an exception for any non-2xx HTTP status code
            response.raise_for_status()
             # Step 3: Parse the response JSON and return it
            return response.json()
        except Exception as e:
            # Step 4: Handle any errors that occurred during the API request
            print(f"Error fetching weather data: {e}")
            return None

    def publish_to_kafka(self, data):
        """
        Publish weather data to Kafka topic.
        """
        try:
            # Step 1: Produce message to Kafka topic
            self.producer.produce(self.topic, key="weather", value=json.dumps(data))
            # Step 2: Flush the producer buffer (ensure all messages are sent)
            self.producer.flush()
            # Step 3: Print success message
            print(f"Published weather data to Kafka topic: {self.topic}")
        except Exception as e:
            # Step 4: Handle any errors that occurred during message publishing
            print(f"Error publishing to Kafka: {e}")


In [2]:
from confluent_kafka import Consumer, KafkaException, KafkaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
import json
import os

class KafkaSparkProcessor:
    """
    Spark Processor: Consumes data from Kafka, processes it, and saves to CSV.
    """
    def __init__(self, kafka_broker, group_id, topic, output_path):
        self.topic = topic
        self.output_path = output_path

        # Initialize Spark Session
        self.spark = SparkSession.builder \
            .appName("WeatherKafkaSparkProcessor") \
            .getOrCreate()

        # Initialize Kafka Consumer
        self.consumer = Consumer({
            'bootstrap.servers': kafka_broker,
            'group.id': group_id,
            'auto.offset.reset': 'earliest'
        })
        self.consumer.subscribe([self.topic])

        # Define Weather Data Schema
        self.weather_schema = StructType([
            StructField("name", StringType(), True),
            StructField("main", StructType([
                StructField("temp", DoubleType(), True),
                StructField("pressure", IntegerType(), True),
                StructField("humidity", IntegerType(), True)
            ]), True),
            StructField("wind", StructType([
                StructField("speed", DoubleType(), True)
            ]), True),
            StructField("weather", StructType([
                StructField("description", StringType(), True)
            ]), True)
        ])

    def consume_and_process(self):
        """
        Consume messages from Kafka, process with Spark, and save to CSV.
        """
        messages = []
        while True:
            msg = self.consumer.poll(1.0)
            if msg is None:
                break
            elif msg.error():
                if msg.error().code() != KafkaError._PARTITION_EOF:
                    raise KafkaException(msg.error())
            else:
                messages.append(msg.value().decode('utf-8'))

        if messages:
            # Convert JSON messages to Spark DataFrame
            rdd = self.spark.sparkContext.parallelize(messages)
            df = self.spark.read.json(rdd, schema=self.weather_schema)

            # Extract and Transform Data
            df_parsed = df.select(
                col("name").alias("city"),
                col("main.temp").alias("temperature"),
                col("main.pressure").alias("pressure"),
                col("main.humidity").alias("humidity"),
                col("wind.speed").alias("wind_speed"),
                col("weather.description").alias("description")
            )

            # Save to CSV
            df_parsed.write.mode("append").csv(self.output_path)
            print("Processed and saved data to CSV.")

In [3]:
import time

class WeatherETLWorkflow:
    """
    ETL Workflow: Coordinates Kafka Producer and Spark Processor.
    """
    def __init__(self, kafka_broker, topic, api_key, city, output_path):
        self.producer = WeatherKafkaProducer(kafka_broker, topic, api_key, city)
        self.processor = KafkaSparkProcessor(kafka_broker, "weather-consumer-group", topic, output_path)

    def run(self, interval=10):
        """
        Run the ETL pipeline: Fetch data, produce to Kafka, process, and store to CSV.
        """
        try:
            while True:
                # Step 1: Fetch and Publish Data
                data = self.producer.fetch_weather_data()
                if data:
                    self.producer.publish_to_kafka(data)
                
                # Step 2: Consume and Process Data
                self.processor.consume_and_process()

                
               # Wait for next iteration
                time.sleep(interval)
        except KeyboardInterrupt:
            print("ETL workflow stopped by user.")



In [4]:

if __name__ == "__main__":
    # Configuration Parameters
    KAFKA_BROKER = "localhost:9092"
    TOPIC = "weather_topic"
    API_KEY = "89ecd29e95f497c6af8d6c94c73596d1"
    # CITY = "London"
    CITY = "Weihai"
    OUTPUT_PATH = "data/real_time_weatherData_kafka.csv"

    # Initialize and Run Workflow
    workflow = WeatherETLWorkflow(KAFKA_BROKER, TOPIC, API_KEY, CITY, OUTPUT_PATH)
    workflow.run(interval=10)


24/12/22 22:14:40 WARN Utils: Your hostname, zhangmins-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.194 instead (on interface en0)
24/12/22 22:14:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/22 22:14:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/22 22:14:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Published weather data to Kafka topic: weather_topic
Published weather data to Kafka topic: weather_topic


                                                                                

Processed and saved data to CSV.
Published weather data to Kafka topic: weather_topic
ETL workflow stopped by user.
