In [1]:
import pandas as pd
import json
import time
from kafka import KafkaConsumer
pd.set_option("display.max_columns", None)

#### Function to store consumed data as csv under "kafka_weather_updates"

In [2]:
csv_file = "C:/Users/hughp/MSCAI Prog for AI/kafka_weather_updates.csv"
def log_weather_data_to_csv(message):
    # Convert the message (dictionary) to a DataFrame
    df = pd.DataFrame([message])

    # Append the data to the CSV file
    df.to_csv(csv_file, mode='a', index=False, header=not pd.io.common.file_exists(csv_file))

    print(f"Logged data: {message}")
    print("----------")

In [9]:
def consume_weather_data(duration=60):
    consumer = KafkaConsumer(
        'global_weather',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        group_id='weather_group',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    print("Consumer started, listening for weather updates...")
    start_time = time.time()
    
    try:
        count=0
        for message in consumer:
            if time.time() - start_time > duration:
                print("Stopping consumer after",duration,"seconds.")
                print(count,"many messages")
                break
            log_weather_data_to_csv(message.value)
            count +=1
    except KeyboardInterrupt:
        print("Consumer stopped.... Keyboard interupted")
        print(count,"many messages")
    finally:
        consumer.close()

In [4]:
consume_weather_data()

Consumer started, listening for weather updates...
Logged data: {'country': 'Georgia', 'location_name': 'Tbilisi', 'latitude': 41.73, 'longitude': 44.79, 'timezone': 'Asia/Tbilisi', 'last_updated_epoch': 1715849100, 'last_updated': '2024-05-16 12:45', 'temperature_celsius': 16.0, 'condition_text': 'Overcast', 'wind_kph': 9.0, 'wind_degree': 140, 'wind_direction': 'SE', 'pressure_mb': 1020.0, 'precip_mm': 0.02, 'humidity': 68, 'cloud': 100, 'feels_like_celsius': 16.0, 'visibility_km': 10.0, 'uv_index': 3.0, 'gust_kph': 16.2, 'air_quality_Carbon_Monoxide': 208.6, 'air_quality_Ozone': 65.1, 'air_quality_Nitrogen_dioxide': 1.7, 'air_quality_Sulphur_dioxide': 0.5, 'air_quality_PM2.5': 2.0, 'air_quality_PM10': 2.4, 'air_quality_us-epa-index': 1, 'air_quality_gb-defra-index': 1, 'sunrise': '05:40 AM', 'sunset': '08:15 PM', 'moonrise': '01:13 PM', 'moonset': '02:33 AM', 'moon_phase': 'Waxing Gibbous', 'moon_illumination': 55}
----------
Logged data: {'country': 'Germany', 'location_name': 'Ber

#### Summary of kafka_weather_updates.csv after running consumer for 60 seconds

In [5]:
Kafka_dataset = pd.read_csv("C:/Users/hughp/MSCAI Prog for AI/kafka_weather_updates.csv")

In [6]:
Kafka_dataset.head()

Unnamed: 0,country,location_name,latitude,longitude,timezone,last_updated_epoch,last_updated,temperature_celsius,condition_text,wind_kph,wind_degree,wind_direction,pressure_mb,precip_mm,humidity,cloud,feels_like_celsius,visibility_km,uv_index,gust_kph,air_quality_Carbon_Monoxide,air_quality_Ozone,air_quality_Nitrogen_dioxide,air_quality_Sulphur_dioxide,air_quality_PM2.5,air_quality_PM10,air_quality_us-epa-index,air_quality_gb-defra-index,sunrise,sunset,moonrise,moonset,moon_phase,moon_illumination
0,Georgia,Tbilisi,41.73,44.79,Asia/Tbilisi,1715849100,2024-05-16 12:45,16.0,Overcast,9.0,140,SE,1020.0,0.02,68,100,16.0,10.0,3.0,16.2,208.6,65.1,1.7,0.5,2.0,2.4,1,1,05:40 AM,08:15 PM,01:13 PM,02:33 AM,Waxing Gibbous,55
1,Germany,Berlin,52.52,13.4,Europe/Berlin,1715849100,2024-05-16 10:45,22.0,Sunny,33.1,120,ESE,1011.0,0.0,38,0,23.9,10.0,6.0,40.3,223.6,86.6,4.0,5.0,6.6,7.5,1,1,05:08 AM,08:58 PM,01:03 PM,03:06 AM,Waxing Gibbous,55
2,Ghana,Accra,5.55,-0.22,Africa/Accra,1715849100,2024-05-16 08:45,29.0,Partly cloudy,9.0,280,W,1012.0,0.0,84,25,34.5,10.0,7.0,18.2,407.2,11.4,4.5,3.1,19.2,32.0,2,2,05:46 AM,06:09 PM,01:00 PM,12:49 AM,Waxing Gibbous,55
3,Greece,Athens,37.98,23.72,Europe/Athens,1715849100,2024-05-16 11:45,23.0,Partly cloudy,3.6,221,SW,1020.0,0.0,38,25,24.6,10.0,6.0,7.5,230.3,105.9,12.5,18.1,8.7,10.2,1,1,06:14 AM,08:30 PM,01:47 PM,02:52 AM,Waxing Gibbous,55
4,Grenada,Saint George's,12.05,-61.75,America/Grenada,1715849100,2024-05-16 04:45,28.0,Partly cloudy,22.0,110,ESE,1011.0,0.06,79,25,32.6,10.0,1.0,29.2,243.7,20.2,0.8,0.7,2.4,12.0,1,1,05:43 AM,06:24 PM,01:08 PM,01:08 AM,Waxing Gibbous,55


In [10]:
Kafka_dataset.describe()

Unnamed: 0,latitude,longitude,last_updated_epoch,temperature_celsius,wind_kph,wind_degree,pressure_mb,precip_mm,humidity,cloud,feels_like_celsius,visibility_km,uv_index,gust_kph,air_quality_Carbon_Monoxide,air_quality_Ozone,air_quality_Nitrogen_dioxide,air_quality_Sulphur_dioxide,air_quality_PM2.5,air_quality_PM10,air_quality_us-epa-index,air_quality_gb-defra-index,moon_illumination
count,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0,60.0
mean,25.414,18.050667,1715849000.0,22.356667,12.723333,157.183333,1013.433333,0.039667,63.816667,42.583333,23.701667,9.466667,4.7,18.765,481.375,66.61,13.213333,7.385,19.51,30.906667,1.483333,2.05,55.0
std,23.296808,60.220895,0.0,7.25395,8.421972,101.558281,6.479084,0.082953,23.915453,33.156175,8.733017,1.741485,2.60573,10.389382,620.579202,54.38813,24.505561,15.075098,37.849809,51.166226,0.947641,2.212637,0.0
min,-35.28,-90.53,1715849000.0,6.3,3.6,3.0,1001.0,0.0,13.0,0.0,3.8,2.0,1.0,4.4,170.2,0.2,0.0,0.1,0.5,0.5,1.0,1.0,55.0
25%,12.0,-11.5275,1715849000.0,17.0,6.1,90.0,1008.0,0.0,43.75,16.0,17.0,10.0,2.0,11.025,209.875,21.0,1.1,0.65,2.475,5.1,1.0,1.0,55.0
50%,30.575,18.73,1715849000.0,23.0,11.2,149.5,1012.0,0.0,64.5,43.5,24.6,10.0,5.0,15.95,273.7,64.75,3.85,2.4,7.2,10.85,1.0,1.0,55.0
75%,42.05,48.44,1715849000.0,26.15,19.275,224.0,1018.0,0.0225,87.25,75.0,29.225,10.0,7.0,26.4,487.3,96.175,12.9,6.375,16.025,32.025,2.0,2.0,55.0
max,63.83,169.53,1715849000.0,42.0,33.1,342.0,1029.0,0.31,100.0,100.0,45.4,13.0,10.0,52.2,3471.4,303.3,145.3,101.1,196.1,262.3,5.0,10.0,55.0


In [8]:
print("\nSummary of updates consumed in 60 seconds:")

print("Total records consumed:", len(Kafka_dataset))
print("Average Temperature (°C): ",Kafka_dataset['temperature_celsius'].mean())
print("Average Humidity (%):", Kafka_dataset['humidity'].mean())
print("Max Temperature (°C):",Kafka_dataset['temperature_celsius'].max())
print("Min Temperature (°C):",Kafka_dataset['temperature_celsius'].min())
print("Locations Consumed From: ",Kafka_dataset['location_name'].nunique())


Summary of updates consumed in 60 seconds:
Total records consumed: 60
Average Temperature (°C):  22.35666666666667
Average Humidity (%): 63.81666666666667
Max Temperature (°C): 42.0
Min Temperature (°C): 6.3
Locations Consumed From:  60
