In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import avg
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Create a Spark session
spark = SparkSession.builder.appName("WeatherPrediction").getOrCreate()

# Define the schema for the CSV data
schema = StructType([
    StructField("city", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("temperature", StringType(), True),
    StructField("humidity", StringType(), True),
    StructField("weather_description", StringType(), True),
    StructField("wind_speed", StringType(), True),
    StructField("cloudiness", StringType(), True)
])

# Read CSV data into a Spark DataFrame with the defined schema
csv_file_path = 'file:///C:/WeatherMonitoringSystem/weather_forecast.csv'
weather_df = spark.read.csv(csv_file_path, header=True, schema=schema)

# Convert temperature, humidity, wind_speed, and cloudiness columns to numeric types
numeric_columns = ["temperature", "humidity", "wind_speed", "cloudiness"]
for column in numeric_columns:
    weather_df = weather_df.withColumn(column, weather_df[column].cast("double"))

# Calculate average temperature, humidity, wind_speed, and cloudiness for each city
average_stats = weather_df.groupBy("city").agg(
    avg("temperature").alias("avg_temperature"),
    avg("humidity").alias("avg_humidity"),
    avg("wind_speed").alias("avg_wind_speed"),
    avg("cloudiness").alias("avg_cloudiness")
)

# Create a feature vector by assembling the features
assembler = VectorAssembler(
    inputCols=["avg_humidity", "avg_wind_speed", "avg_cloudiness"],
    outputCol="features"
)

# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="avg_temperature")

# Create a pipeline with the assembler and the linear regression model
pipeline = Pipeline(stages=[assembler, lr])

# Split the data into training and testing sets
train_data, test_data = average_stats.randomSplit([0.8, 0.2], seed=123)

# Fit the model on the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Show the predicted values alongside the actual values
predictions.select("city", "avg_temperature", "prediction").show()

# Stop the Spark session
spark.stop()

+----------+---------------+------------------+
|      city|avg_temperature|        prediction|
+----------+---------------+------------------+
|  Edmonton|         262.48| 268.5008786961388|
|   Kelowna|         272.62| 271.7033973084038|
| Saskatoon|         263.14| 273.2763239817554|
|St. John's|         278.63|274.02695843480956|
|Whitehorse|         256.99| 274.9225148431824|
+----------+---------------+------------------+



In [1]:
import csv
from kafka import KafkaProducer
import requests
import json
from datetime import datetime, timedelta

# Define your list of cities in Canada with unpredictable weather
cities = ['Toronto', 'Vancouver', 'Calgary', 'Montreal', 'Edmonton', 
          'Ottawa', 'Quebec City', 'Winnipeg', 'Halifax', 'Victoria', 
          'Regina', 'Saskatoon', 'St. John\'s', 'Hamilton', 'Kelowna', 
          'London', 'Thunder Bay', 'Charlottetown', 'Fredericton', 'Whitehorse']

# Kafka producer setup
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# OpenWeather API key
openweather_api_key = '56718e7590a2f67ca64cfb140da39658'

# Columns to extract for CSV file
csv_columns = ['city', 'date', 'temperature', 'humidity', 'weather_description', 'wind_speed', 'cloudiness']

# File to save the data
csv_file_path = r'C:\WeatherMonitoringSystem\predict_forecast.csv'

# Open CSV file for writing
with open(csv_file_path, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
    writer.writeheader()

# Fetch data for each city and write to CSV
for city in cities:
    # Calculate the date for the next day
    next_day = datetime.now() + timedelta(days=1)
    next_day_str = next_day.strftime("%Y-%m-%d")

    url = f'http://api.openweathermap.org/data/2.5/forecast?q={city},CA&appid={openweather_api_key}'
    response = requests.get(url)

    if response.status_code == 200:
        weather_data = response.json()

        # Extract necessary fields for CSV for the next day
        for entry in weather_data.get('list', []):
            timestamp = entry.get('dt', 0)
            date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')

            # Check if the entry corresponds to the next day
            if date.startswith(next_day_str):
                extracted_data = {
                    'city': city,
                    'date': date,
                    'temperature': entry.get('main', {}).get('temp', ''),
                    'humidity': entry.get('main', {}).get('humidity', ''),
                    'weather_description': entry.get('weather', [{}])[0].get('description', ''),
                    'wind_speed': entry.get('wind', {}).get('speed', ''),
                    'cloudiness': entry.get('clouds', {}).get('all', '')
                }

                # Write data to CSV file
                with open(csv_file_path, 'a', newline='') as csvfile:
                    writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
                    writer.writerow(extracted_data)

                # Publish data to Kafka
                producer.send('weather_topic', json.dumps(extracted_data).encode('utf-8'))

                print(f"Data for {city} on {next_day_str} stored in CSV file and published to Kafka.")
                break  # Stop after finding the entry for the next day
    else:
        print(f"Failed to fetch data for {city}. Status code: {response.status_code}")

# Close the Kafka producer
producer.close()

Data for Toronto on 2023-12-03 stored in CSV file and published to Kafka.
Data for Vancouver on 2023-12-03 stored in CSV file and published to Kafka.
Data for Calgary on 2023-12-03 stored in CSV file and published to Kafka.
Data for Montreal on 2023-12-03 stored in CSV file and published to Kafka.
Data for Edmonton on 2023-12-03 stored in CSV file and published to Kafka.
Data for Ottawa on 2023-12-03 stored in CSV file and published to Kafka.
Data for Quebec City on 2023-12-03 stored in CSV file and published to Kafka.
Data for Winnipeg on 2023-12-03 stored in CSV file and published to Kafka.
Data for Halifax on 2023-12-03 stored in CSV file and published to Kafka.
Data for Victoria on 2023-12-03 stored in CSV file and published to Kafka.
Data for Regina on 2023-12-03 stored in CSV file and published to Kafka.
Data for Saskatoon on 2023-12-03 stored in CSV file and published to Kafka.
Data for St. John's on 2023-12-03 stored in CSV file and published to Kafka.
Data for Hamilton on 2023