In [1]:
!pip install hopsworks==4.2.*



In [2]:
!pip install openmeteo-requests
!pip install requests-cache retry-requests numpy pandas



In [3]:
!pip install confluent-kafka



In [4]:
import os
from google.colab import userdata

os.environ["HOPSWORKS_API_KEY"] = userdata.get('HOPSWORKS_API_KEY')


In [5]:
import hopsworks
import os

project = hopsworks.login(api_key_value=os.environ["HOPSWORKS_API_KEY"])
fs = project.get_feature_store()


Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1238195


In [6]:
fg = fs.get_feature_group(name="karachi_raw_data_store", version=1)
existing_df = fg.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.61s) 


Unnamed: 0,temperature,humidity,wind_speed,wind_direction,hour,day,weekday,pm2_5,pm10,co,so2,o3,no2,aqi,date,date_str
0,30.700001,75.856354,59.085369,201.218048,13.0,24.0,1.0,31.158333,84.033333,253.141724,5.075465,35.65625,4.835797,91.079757,2025-06-24,2025-06-24 13:00:00
1,31.65,63.206875,66.226105,233.354263,12.0,4.0,2.0,31.8375,88.575,219.971436,4.655689,42.278126,3.188437,92.508047,2025-06-04,2025-06-04 12:00:00
2,28.799999,67.358147,36.253269,241.14444,14.0,22.0,1.0,36.529166,93.470834,446.926086,5.151787,41.259377,15.41078,115.910548,2025-04-22,2025-04-22 14:00:00
3,28.799999,85.8843,33.250671,236.929321,22.0,9.0,2.0,32.458334,90.175001,124.825058,3.281879,21.393749,8.5025,93.813663,2025-07-09,2025-07-09 22:00:00
4,27.85,86.818497,23.183554,243.435013,19.0,7.0,2.0,22.091666,54.029167,205.132095,5.342594,10.1875,18.014671,72.012517,2025-05-07,2025-05-07 19:00:00


In [7]:
import openmeteo_requests

import pandas as pd
import requests_cache
from retry_requests import retry

# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = 3600)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)

# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = "https://air-quality-api.open-meteo.com/v1/air-quality"
params = {
        "latitude": 24.8608,
        "longitude": 67.0104,
        "hourly": ["pm10", "pm2_5", "carbon_monoxide", "carbon_dioxide", "nitrogen_dioxide", "sulphur_dioxide", "ozone"],
        "current": ["pm10", "pm2_5", "carbon_monoxide", "nitrogen_dioxide", "sulphur_dioxide", "ozone"],
        "timezone": "Pacific/Auckland",
        "past_days": 92,
        "forecast_days": 1
}
responses = openmeteo.weather_api(url, params=params)

# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()}{response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")

# Current values. The order of variables needs to be the same as requested.
current = response.Current()
current_pm10 = current.Variables(0).Value()
current_pm2_5 = current.Variables(1).Value()
current_carbon_monoxide = current.Variables(2).Value()
current_nitrogen_dioxide = current.Variables(3).Value()
current_sulphur_dioxide = current.Variables(4).Value()
current_ozone = current.Variables(5).Value()

print(f"Current time {current.Time()}")
print(f"Current pm10 {current_pm10}")
print(f"Current pm2_5 {current_pm2_5}")
print(f"Current carbon_monoxide {current_carbon_monoxide}")
print(f"Current nitrogen_dioxide {current_nitrogen_dioxide}")
print(f"Current sulphur_dioxide {current_sulphur_dioxide}")
print(f"Current ozone {current_ozone}")

# Process hourly data. The order of variables needs to be the same as requested.
hourly = response.Hourly()
hourly_pm10 = hourly.Variables(0).ValuesAsNumpy()
hourly_pm2_5 = hourly.Variables(1).ValuesAsNumpy()
hourly_carbon_monoxide = hourly.Variables(2).ValuesAsNumpy()
hourly_carbon_dioxide = hourly.Variables(3).ValuesAsNumpy()
hourly_nitrogen_dioxide = hourly.Variables(4).ValuesAsNumpy()
hourly_sulphur_dioxide = hourly.Variables(5).ValuesAsNumpy()
hourly_ozone = hourly.Variables(6).ValuesAsNumpy()

hourly_data = {"date": pd.date_range(
        start = pd.to_datetime(hourly.Time(), unit = "s", utc = True),
        end = pd.to_datetime(hourly.TimeEnd(), unit = "s", utc = True),
        freq = pd.Timedelta(seconds = hourly.Interval()),
        inclusive = "left"
)}

hourly_data["pm10"] = hourly_pm10
hourly_data["pm2_5"] = hourly_pm2_5
hourly_data["carbon_monoxide"] = hourly_carbon_monoxide
hourly_data["carbon_dioxide"] = hourly_carbon_dioxide
hourly_data["nitrogen_dioxide"] = hourly_nitrogen_dioxide
hourly_data["sulphur_dioxide"] = hourly_sulphur_dioxide
hourly_data["ozone"] = hourly_ozone

air_quality_dataframe = pd.DataFrame(data = hourly_data)
print(air_quality_dataframe)

Coordinates 24.900001525878906°N 67.0°E
Elevation 8.0 m asl
Timezone b'Pacific/Auckland'b'GMT+12'
Timezone difference to GMT+0 43200 s
Current time 1753257600
Current pm10 80.0999984741211
Current pm2_5 27.5
Current carbon_monoxide 200.0
Current nitrogen_dioxide 3.0999999046325684
Current sulphur_dioxide 10.399999618530273
Current ozone 78.0
                          date        pm10      pm2_5  carbon_monoxide  \
0    2025-04-21 12:00:00+00:00  128.500000  44.200001            378.0   
1    2025-04-21 13:00:00+00:00  110.400002  43.000000            468.0   
2    2025-04-21 14:00:00+00:00   95.500000  43.099998            572.0   
3    2025-04-21 15:00:00+00:00   92.699997  41.799999            638.0   
4    2025-04-21 16:00:00+00:00   92.000000  41.400002            640.0   
...                        ...         ...        ...              ...   
2227 2025-07-23 07:00:00+00:00   78.699997  27.100000            192.0   
2228 2025-07-23 08:00:00+00:00   80.099998  27.500000           

In [8]:
# Define the conversion factors from µg/m³ to ppb for each gas
conversion_factors = {
    'carbon_monoxide': {'factor_ppb': 28.01 / 24.45, 'target_unit_ppb': 'ppb'},
    'nitrogen_dioxide': {'factor_ppb': 46.01 / 24.45, 'target_unit_ppb': 'ppb'},
    'sulphur_dioxide': {'factor_ppb': 64.07 / 24.45, 'target_unit_ppb': 'ppb'},
    'ozone': {'factor_ppb': 48.00 / 24.45, 'target_unit_ppb': 'ppb'},
    'carbon_dioxide': {'factor_ppb': 44.01 / 24.45, 'target_unit_ppb': 'ppb'}
}

# Convert units and create new columns
# PM2.5 and PM10 are in μg/m³. Calculate 24-hour rolling averages.
air_quality_dataframe['PM2.5 (μg/m³) 24h'] = air_quality_dataframe['pm2_5'].rolling(window=24, min_periods=1).mean()
air_quality_dataframe['PM10 (μg/m³) 24h'] = air_quality_dataframe['pm10'].rolling(window=24, min_periods=1).mean()


# Convert gases to ppb and calculate rolling averages where required for AQI
if 'carbon_monoxide' in air_quality_dataframe.columns:
    # Convert µg/m³ to ppb
    air_quality_dataframe['Carbon Monoxide (ppb) Hourly'] = air_quality_dataframe['carbon_monoxide'] / conversion_factors['carbon_monoxide']['factor_ppb']
    # Convert CO ppb to ppm (1 ppm = 1000 ppb) and calculate 8-hour rolling average
    air_quality_dataframe['CO (ppm) 8h'] = (air_quality_dataframe['Carbon Monoxide (ppb) Hourly'] / 1000).rolling(window=8, min_periods=1).mean()

if 'nitrogen_dioxide' in air_quality_dataframe.columns:
    # Convert µg/m³ to ppb and store as 1h average
    air_quality_dataframe['NO2 (ppb) 1h'] = air_quality_dataframe['nitrogen_dioxide'] / conversion_factors['nitrogen_dioxide']['factor_ppb']


if 'sulphur_dioxide' in air_quality_dataframe.columns:
    # Convert µg/m³ to ppb
    air_quality_dataframe['Sulphur Dioxide (ppb) Hourly'] = air_quality_dataframe['sulphur_dioxide'] / conversion_factors['sulphur_dioxide']['factor_ppb']
    # Store as 1h average. EPA AQI for SO2 is based on 1-hour.
    air_quality_dataframe['SO2 (ppb) 1h'] = air_quality_dataframe['Sulphur Dioxide (ppb) Hourly']
    # Calculate 24-hour rolling average for SO2 as per requested column name, though 1h is used for AQI
    air_quality_dataframe['SO2 (ppb) 24h'] = air_quality_dataframe['Sulphur Dioxide (ppb) Hourly'].rolling(window=24, min_periods=1).mean()


if 'ozone' in air_quality_dataframe.columns:
    # Convert µg/m³ to ppb
    air_quality_dataframe['Ozone (ppb) Hourly'] = air_quality_dataframe['ozone'] / conversion_factors['ozone']['factor_ppb']
    # Store as 1h average
    air_quality_dataframe['O3 (ppb) 1h'] = air_quality_dataframe['Ozone (ppb) Hourly']
    # Calculate 8-hour rolling average
    air_quality_dataframe['O3 (ppb) 8h'] = air_quality_dataframe['Ozone (ppb) Hourly'].rolling(window=8, min_periods=1).mean()

if 'carbon_dioxide' in air_quality_dataframe.columns:
    # Convert µg/m³ to ppb (CO2 is not used for standard EPA AQI, but converting for completeness)
    air_quality_dataframe['Carbon Dioxide (ppb) Hourly'] = air_quality_dataframe['carbon_dioxide'] / conversion_factors['carbon_dioxide']['factor_ppb']


# Display the updated dataframe with new columns
display(air_quality_dataframe.head())

Unnamed: 0,date,pm10,pm2_5,carbon_monoxide,carbon_dioxide,nitrogen_dioxide,sulphur_dioxide,ozone,PM2.5 (μg/m³) 24h,PM10 (μg/m³) 24h,Carbon Monoxide (ppb) Hourly,CO (ppm) 8h,NO2 (ppb) 1h,Sulphur Dioxide (ppb) Hourly,SO2 (ppb) 1h,SO2 (ppb) 24h,Ozone (ppb) Hourly,O3 (ppb) 1h,O3 (ppb) 8h,Carbon Dioxide (ppb) Hourly
0,2025-04-21 12:00:00+00:00,128.5,44.200001,378.0,451.0,13.4,20.1,139.0,44.200001,128.5,329.957153,0.329957,7.120843,7.670439,7.670439,7.670439,70.803123,70.803123,70.803123,250.555557
1,2025-04-21 13:00:00+00:00,110.400002,43.0,468.0,452.0,20.4,18.299999,118.0,43.6,119.450001,408.518372,0.369238,10.840687,6.983534,6.983534,7.326987,60.106251,60.106251,65.454687,251.111115
2,2025-04-21 14:00:00+00:00,95.5,43.099998,572.0,454.0,30.1,15.8,88.0,43.433333,111.466667,499.300232,0.412592,15.995327,6.0295,6.0295,6.894491,44.825001,44.825001,58.578125,252.222229
3,2025-04-21 15:00:00+00:00,92.699997,41.799999,638.0,456.0,36.599998,13.8,66.0,43.025,106.775,556.911804,0.448672,19.449467,5.266272,5.266272,6.487436,33.618752,33.618752,52.338282,253.333344
4,2025-04-21 16:00:00+00:00,92.0,41.400002,640.0,458.0,37.299999,12.9,58.0,42.7,103.82,558.657593,0.470669,19.821451,4.922819,4.922819,6.174513,29.543749,29.543749,47.779375,254.444458


In [9]:
import numpy as np
import pandas as pd

def calculate_epa_aqi(pollutant, concentration, unit):
    """
    Calculates the US EPA Air Quality Index (AQI) for a given pollutant concentration.

    Args:
        pollutant (str): The name of the pollutant (e.g., 'PM2.5', 'PM10', 'CO', 'SO2', 'O3', 'NO2').
        concentration (float or np.ndarray or pd.Series): The pollutant concentration.
        unit (str): The unit of the concentration ('ug/m3' or 'ppb' or 'ppm').

    Returns:
        float or np.ndarray: The calculated AQI value(s). Returns NaN if pollutant or unit is invalid.
    """

    # EPA AQI Breakpoints and corresponding AQI values
    # Source: https://www.airnow.gov/sites/default/files/2020-05/aqi-calculator-download_0.xlsx
    # Note: This is a simplified version and might not cover all averaging periods or nuances
    # of the official EPA calculator.
    aqi_breakpoints = {
        'PM2.5': [(0.0, 12.0, 0, 50), (12.1, 35.4, 51, 100), (35.5, 55.4, 101, 150), (55.5, 150.4, 151, 200), (150.5, 250.4, 201, 300), (250.5, 350.4, 301, 400), (350.5, 500.4, 401, 500)], # 24-hour average
        'PM10': [(0, 54, 0, 50), (55, 154, 51, 100), (155, 254, 101, 150), (255, 354, 151, 200), (355, 424, 201, 300), (425, 504, 301, 400), (505, 604, 401, 500)], # 24-hour average
        'CO': [(0.0, 4.4, 0, 50), (4.5, 9.4, 51, 100), (9.5, 12.4, 101, 150), (12.5, 15.4, 151, 200), (15.5, 30.4, 201, 300), (30.5, 40.4, 301, 400), (40.5, 50.4, 401, 500)], # 8-hour average in ppm
        'SO2': [(0.0, 35, 0, 50), (36, 75, 51, 100), (76, 185, 101, 150), (186, 304, 151, 200), (305, 604, 201, 300), (605, 804, 301, 400), (805, 1004, 401, 500)], # 1-hour average in ppb
        'O3': [(0.0, 54, 0, 50), (55, 70, 51, 100), (71, 85, 101, 150), (86, 105, 151, 200), (106, 200, 201, 300)], # 8-hour average in ppb
        'NO2': [(0, 53, 0, 50), (54, 100, 51, 100), (101, 360, 101, 150), (361, 649, 151, 200), (650, 1249, 201, 300), (1250, 1649, 301, 400), (1650, 2049, 401, 500)] # 1-hour average in ppb
    }

    if pollutant not in aqi_breakpoints:
        print(f"Warning: No AQI breakpoints found for pollutant: {pollutant}")
        return np.nan

    breakpoints = aqi_breakpoints[pollutant]

    if pollutant in ['O3', 'SO2', 'NO2'] and unit.lower() != 'ppb':
         print(f"Warning: {pollutant} AQI calculation requires ppb, but got {unit}")
         return np.nan
    elif pollutant == 'CO' and unit.lower() != 'ppm':
         print(f"Warning: {pollutant} AQI calculation requires ppm, but got {unit}")
         return np.nan
    elif pollutant in ['PM2.5', 'PM10'] and unit.lower() != 'ug/m3':
         print(f"Warning: {pollutant} AQI calculation requires ug/m3, but got {unit}")
         return np.nan

    # Convert pandas Series to numpy array for consistent processing
    if isinstance(concentration, pd.Series):
        concentration = concentration.values

    aqi_values = []
    if isinstance(concentration, np.ndarray):
        for c in concentration:
            aqi = np.nan
            for c_low, c_high, i_low, i_high in breakpoints:
                if c_low <= c and c <= c_high: # Modified comparison
                    aqi = ((i_high - i_low) / (c_high - c_low)) * (c - c_low) + i_low
                    break
            aqi_values.append(aqi)
        return np.array(aqi_values)
    else:
        aqi = np.nan
        for c_low, c_high, i_low, i_high in breakpoints:
            if c_low <= concentration and concentration <= c_high: # Modified comparison
                aqi = ((i_high - i_low) / (c_high - c_low)) * (concentration - c_low) + i_low
                break
        return aqi

In [10]:
# Print column names to debug KeyError
print(air_quality_dataframe.columns)

air_quality_dataframe['Calculated AQI PM2.5'] = calculate_epa_aqi('PM2.5', air_quality_dataframe['PM2.5 (μg/m³) 24h'], 'ug/m3')
air_quality_dataframe['Calculated AQI PM10'] = calculate_epa_aqi('PM10', air_quality_dataframe['PM10 (μg/m³) 24h'], 'ug/m3')
air_quality_dataframe['Calculated AQI CO'] = calculate_epa_aqi('CO', air_quality_dataframe['CO (ppm) 8h'], 'ppm')
air_quality_dataframe['Calculated AQI SO2'] = calculate_epa_aqi('SO2', air_quality_dataframe['SO2 (ppb) 1h'], 'ppb')
# For Ozone, use the 8-hour average breakpoints with the 1-hour data as per the column name provided
air_quality_dataframe['Calculated AQI O3'] = calculate_epa_aqi('O3', air_quality_dataframe['O3 (ppb) 8h'], 'ppb')
air_quality_dataframe['Calculated AQI NO2'] = calculate_epa_aqi('NO2', air_quality_dataframe['NO2 (ppb) 1h'], 'ppb')

# Calculate the overall calculated AQI as the maximum of the individual pollutant AQI values
pollutant_aqi_columns = ['Calculated AQI PM2.5', 'Calculated AQI PM10', 'Calculated AQI CO',
                         'Calculated AQI SO2', 'Calculated AQI O3', 'Calculated AQI NO2']
air_quality_dataframe[pollutant_aqi_columns] = air_quality_dataframe[pollutant_aqi_columns].astype(float) # Ensure the columns are numeric
air_quality_dataframe['Calculated Overall AQI'] = air_quality_dataframe[pollutant_aqi_columns].max(axis=1)

Index(['date', 'pm10', 'pm2_5', 'carbon_monoxide', 'carbon_dioxide',
       'nitrogen_dioxide', 'sulphur_dioxide', 'ozone', 'PM2.5 (μg/m³) 24h',
       'PM10 (μg/m³) 24h', 'Carbon Monoxide (ppb) Hourly', 'CO (ppm) 8h',
       'NO2 (ppb) 1h', 'Sulphur Dioxide (ppb) Hourly', 'SO2 (ppb) 1h',
       'SO2 (ppb) 24h', 'Ozone (ppb) Hourly', 'O3 (ppb) 1h', 'O3 (ppb) 8h',
       'Carbon Dioxide (ppb) Hourly'],
      dtype='object')


In [11]:
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = -1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)

# Define the URL for the historical weather archive API
url = "https://archive-api.open-meteo.com/v1/archive"

# Define the parameters for the API request
params = {
        "latitude": 24.8608,
        "longitude": 67.0104,
        "start_date": "2025-04-16",
        "end_date": "2025-07-17",
        "hourly": ["temperature_2m", "relative_humidity_2m", "rain", "wind_speed_10m", "wind_direction_10m", "wind_speed_100m", "wind_direction_100m"],
        "timezone": "Pacific/Auckland"
}

# Make the API request
responses = openmeteo.weather_api(url, params=params)

# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()}{response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")

# Process hourly data. The order of variables needs to be the same as requested.
hourly = response.Hourly()
hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()
hourly_relative_humidity_2m = hourly.Variables(1).ValuesAsNumpy()
hourly_rain = hourly.Variables(2).ValuesAsNumpy()
hourly_wind_speed_10m = hourly.Variables(3).ValuesAsNumpy()
hourly_wind_direction_10m = hourly.Variables(4).ValuesAsNumpy()
hourly_wind_speed_100m = hourly.Variables(5).ValuesAsNumpy()
hourly_wind_direction_100m = hourly.Variables(6).ValuesAsNumpy()

# Create a dictionary hourly_data to store the date and the extracted hourly weather variables
hourly_data = {"date": pd.date_range(
        start = pd.to_datetime(hourly.Time(), unit = "s", utc = True),
        end = pd.to_datetime(hourly.TimeEnd(), unit = "s", utc = True),
        freq = pd.Timedelta(seconds = hourly.Interval()),
        inclusive = "left"
)}

hourly_data["temperature_2m"] = hourly_temperature_2m
hourly_data["relative_humidity_2m"] = hourly_relative_humidity_2m
hourly_data["rain"] = hourly_rain
hourly_data["wind_speed_10m"] = hourly_wind_speed_10m
hourly_data["wind_direction_10m"] = hourly_wind_direction_10m
hourly_data["wind_speed_100m"] = hourly_wind_speed_100m
hourly_data["wind_direction_100m"] = hourly_wind_direction_100m

# Create a pandas DataFrame named hourly_dataframe from the hourly_data dictionary
hourly_dataframe = pd.DataFrame(data = hourly_data)


Coordinates 24.850614547729492°N 66.99248504638672°E
Elevation 8.0 m asl
Timezone b'Pacific/Auckland'b'GMT+12'
Timezone difference to GMT+0 43200 s
                       date  temperature_2m  relative_humidity_2m  rain  \
0 2025-04-15 12:00:00+00:00       30.150000             71.382370   0.0   
1 2025-04-15 13:00:00+00:00       29.450001             73.210899   0.0   
2 2025-04-15 14:00:00+00:00       28.350000             81.123108   0.0   
3 2025-04-15 15:00:00+00:00       27.950001             83.532425   0.0   
4 2025-04-15 16:00:00+00:00       27.600000             85.512299   0.0   

   wind_speed_10m  wind_direction_10m  wind_speed_100m  wind_direction_100m  
0       16.412603          248.781952        23.794067           251.016678  
1       11.874544          255.963730        17.337091           257.406677  
2       11.165805          249.227661        16.507088           251.564957  
3       11.914042          243.047897        18.596710           244.179062  
4       11.

In [12]:
# Convert wind speed from m/s to km/h (1 m/s = 3.6 km/h)
hourly_dataframe['wind_speed_10m'] = hourly_dataframe['wind_speed_10m'] * 3.6
hourly_dataframe['wind_speed_100m'] = hourly_dataframe['wind_speed_100m'] * 3.6


Unnamed: 0,date,temperature_2m,relative_humidity_2m,rain,wind_speed_10m,wind_direction_10m,wind_speed_100m,wind_direction_100m
0,2025-04-15 12:00:00+00:00,30.15,71.38237,0.0,59.085369,248.781952,85.658638,251.016678
1,2025-04-15 13:00:00+00:00,29.450001,73.210899,0.0,42.74836,255.96373,62.413528,257.406677
2,2025-04-15 14:00:00+00:00,28.35,81.123108,0.0,40.196896,249.227661,59.425514,251.564957
3,2025-04-15 15:00:00+00:00,27.950001,83.532425,0.0,42.890549,243.047897,66.948158,244.179062
4,2025-04-15 16:00:00+00:00,27.6,85.512299,0.0,41.401062,252.699387,66.847717,253.673065


In [13]:
# Extract hour, day, and weekday from the 'date' column
hourly_dataframe['hour'] = hourly_dataframe['date'].dt.hour
hourly_dataframe['day'] = hourly_dataframe['date'].dt.day
hourly_dataframe['weekday'] = hourly_dataframe['date'].dt.weekday # Monday=0, Sunday=6

# Calculate the temperature change from the previous hour
hourly_dataframe['temperature_change_1h'] = hourly_dataframe['temperature_2m'].diff()

# Calculate the 24-hour rolling average of 'relative_humidity_2m'
hourly_dataframe['relative_humidity_2m_24h'] = hourly_dataframe['relative_humidity_2m'].rolling(window=24, min_periods=1).mean()

Unnamed: 0,date,temperature_2m,relative_humidity_2m,rain,wind_speed_10m,wind_direction_10m,wind_speed_100m,wind_direction_100m,hour,day,weekday,temperature_change_1h,relative_humidity_2m_24h
0,2025-04-15 12:00:00+00:00,30.15,71.38237,0.0,59.085369,248.781952,85.658638,251.016678,12,15,1,,71.38237
1,2025-04-15 13:00:00+00:00,29.450001,73.210899,0.0,42.74836,255.96373,62.413528,257.406677,13,15,1,-0.699999,72.296635
2,2025-04-15 14:00:00+00:00,28.35,81.123108,0.0,40.196896,249.227661,59.425514,251.564957,14,15,1,-1.1,75.238792
3,2025-04-15 15:00:00+00:00,27.950001,83.532425,0.0,42.890549,243.047897,66.948158,244.179062,15,15,1,-0.4,77.312201
4,2025-04-15 16:00:00+00:00,27.6,85.512299,0.0,41.401062,252.699387,66.847717,253.673065,16,15,1,-0.35,78.95222


Unnamed: 0,date,temperature_2m,relative_humidity_2m,rain,wind_speed_10m,wind_direction_10m,wind_speed_100m,wind_direction_100m,hour,day,weekday,temperature_change_1h,relative_humidity_2m_24h
2227,2025-07-17 07:00:00+00:00,31.35,67.858482,0.1,47.873119,219.507645,65.716911,220.601212,7,17,3,-0.1,74.010454
2228,2025-07-17 08:00:00+00:00,31.700001,66.920807,0.2,52.484001,212.905243,71.890045,213.976593,8,17,3,0.35,74.277959
2229,2025-07-17 09:00:00+00:00,31.049999,71.117538,0.2,59.308819,213.863724,82.672707,213.814667,9,17,3,-0.650002,74.674809
2230,2025-07-17 10:00:00+00:00,30.9,71.942528,0.1,58.319996,216.86998,81.519325,216.596725,10,17,3,-0.15,75.09784
2231,2025-07-17 11:00:00+00:00,30.799999,71.925026,0.0,56.972427,213.87085,79.09848,214.992096,11,17,3,-0.1,75.443882


In [14]:
# Merge the two dataframes on the 'date' column
merged_dataframe = pd.merge(hourly_dataframe, air_quality_dataframe, on='date', how='inner')


Unnamed: 0,date,temperature_2m,relative_humidity_2m,rain,wind_speed_10m,wind_direction_10m,wind_speed_100m,wind_direction_100m,hour,day,...,O3 (ppb) 1h,O3 (ppb) 8h,Carbon Dioxide (ppb) Hourly,Calculated AQI PM2.5,Calculated AQI PM10,Calculated AQI CO,Calculated AQI SO2,Calculated AQI O3,Calculated AQI NO2,Calculated Overall AQI
0,2025-04-21 12:00:00+00:00,33.400002,29.573647,0.0,54.482121,243.130264,82.448921,246.860519,12,21,...,70.803123,70.803123,250.555557,122.422112,87.378788,3.749513,10.95777,,6.717776,122.422112
1,2025-04-21 13:00:00+00:00,31.15,46.241257,0.0,45.239494,231.981003,73.155159,235.465271,13,21,...,60.106251,65.454687,251.111115,120.944725,82.899495,4.195884,9.976477,85.151978,10.227063,120.944725
2,2025-04-21 14:00:00+00:00,29.450001,56.498466,0.0,36.432354,231.499313,67.048424,235.234726,14,21,...,44.825001,58.578125,252.222229,120.534338,78.948148,4.688545,8.613571,62.688542,15.089931,120.534338
3,2025-04-21 15:00:00+00:00,28.700001,60.295498,0.0,34.8055,245.820892,69.56575,242.241547,15,21,...,33.618752,52.338282,253.333344,119.528894,76.62601,5.098544,7.523245,48.461372,18.348553,119.528894
4,2025-04-21 16:00:00+00:00,28.200001,64.214607,0.0,39.293598,256.651276,83.047714,249.443863,16,21,...,29.543749,47.779375,254.444458,118.728643,75.163434,5.348512,7.032598,44.240162,18.699482,118.728643


In [16]:
# Keep only the required columns
merged_dataframe = merged_dataframe[[
    'date',
    'temperature_2m',
    'relative_humidity_2m',
    'wind_speed_10m',
    'wind_direction_10m',
    'hour',
    'day',
    'weekday',
    'PM2.5 (μg/m³) 24h',
    'PM10 (μg/m³) 24h',
    'Carbon Monoxide (ppb) Hourly',
    'Sulphur Dioxide (ppb) Hourly',
    'Ozone (ppb) Hourly',
    'NO2 (ppb) 1h',
    'Calculated Overall AQI'
]]

# Rename columns
merged_dataframe.rename(columns={
    'temperature_2m': 'temperature',
    'relative_humidity_2m': 'humidity',
    'wind_speed_10m': 'wind_speed',
    'wind_direction_10m': 'wind_direction',
    'PM2.5 (μg/m³) 24h': 'pm2_5',
    'PM10 (μg/m³) 24h': 'pm10',
    'Carbon Monoxide (ppb) Hourly': 'co',
    'Sulphur Dioxide (ppb) Hourly': 'so2',
    'Ozone (ppb) Hourly': 'o3',
    'NO2 (ppb) 1h': 'no2',
    'Calculated Overall AQI': 'aqi'
}, inplace=True)

# Print updated columns
print(merged_dataframe.columns)

Index(['date', 'temperature', 'humidity', 'wind_speed', 'wind_direction',
       'hour', 'day', 'weekday', 'pm2_5', 'pm10', 'co', 'so2', 'o3', 'no2',
       'aqi'],
      dtype='object')


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [17]:
# Convert 'date' columns to datetime objects and ensure they are in UTC
def ensure_utc(df, column):
    df[column] = pd.to_datetime(df[column])
    if df[column].dt.tz is None:
        # Column is timezone-naive, localize to UTC
        df[column] = df[column].dt.tz_localize('UTC')
    else:
        # Column is already timezone-aware, convert to UTC
        df[column] = df[column].dt.tz_convert('UTC')
    return df

merged_dataframe = ensure_utc(merged_dataframe, 'date')
existing_df = ensure_utc(existing_df, 'date')

# Set 'date' as index for both dataframes before merging
merged_indexed = merged_dataframe.set_index('date')
existing_indexed = existing_df.set_index('date')

# Merge the dataframes based on the index, prioritizing merged_indexed
final_df = merged_indexed.combine_first(existing_indexed)

# Reset the index to make 'date' a column again
final_df = final_df.reset_index()

# Display the first few rows of the final dataframe
display(final_df.head())

Unnamed: 0,date,aqi,co,date_str,day,hour,humidity,no2,o3,pm10,pm2_5,so2,temperature,weekday,wind_direction,wind_speed
0,2025-04-20 00:00:00+00:00,114.378559,521.122803,2025-04-20 14:00:00,20.0,14.0,49.064171,15.091936,45.334373,153.966665,40.933333,5.075465,29.950001,6.0,235.864014,46.19017
1,2025-04-20 00:00:00+00:00,119.313441,364.87326,2025-04-20 19:00:00,20.0,19.0,61.050877,10.096718,36.165627,137.4875,42.9375,3.396363,27.049999,6.0,290.658997,42.244431
2,2025-04-20 00:00:00+00:00,118.954354,135.299896,2025-04-20 23:00:00,20.0,23.0,56.560349,4.570094,35.65625,132.325001,42.791666,2.862104,26.049999,6.0,294.519623,40.59708
3,2025-04-20 00:00:00+00:00,110.503029,254.014633,2025-04-20 12:00:00,20.0,12.0,9.752054,5.951749,65.199997,174.199997,37.799999,5.304433,36.650002,6.0,266.820221,58.409924
4,2025-04-20 00:00:00+00:00,118.95025,182.43663,2025-04-20 21:00:00,20.0,21.0,64.94915,6.642578,36.674999,133.18,42.789999,2.862104,26.0,6.0,307.794006,40.181225


In [22]:
import pandas as pd
import hopsworks
from hsfs.feature import Feature
from hsfs.feature_group import FeatureGroup
from google.colab import userdata
from hsfs.client.exceptions import FeatureStoreException


# Step 2: Add string version of timestamp for online primary key
# Check if 'date_str' already exists, if not, create it. This column is needed for the online feature store.
if 'date_str' not in final_df.columns:
    # Ensure the 'date' column is in datetime format before creating 'date_str'
    final_df['date'] = pd.to_datetime(final_df['date'])
    final_df["date_str"] = final_df["date"].dt.strftime("%Y-%m-%d %H:%M:%S")
else:
    # Ensure the 'date' column is in datetime format
    final_df['date'] = pd.to_datetime(final_df['date'])
    # Fill any nulls in 'date_str' by formatting the 'date' column
    final_df['date_str'] = final_df['date_str'].fillna(final_df['date'].dt.strftime("%Y-%m-%d %H:%M:%S"))


# Handle null values in 'date_str' by dropping rows with nulls in this column
final_df.dropna(subset=['date_str'], inplace=True)

# Step 4: Upload raw feature set to Resources (optional)
# Retrieve the API key from Colab secrets
api_key = userdata.get('HOPSWORKS_API_KEY')

# Login to Hopsworks using the API key
project = hopsworks.login(api_key_value=api_key)

fs = project.get_feature_store()
dataset_api = project.get_dataset_api()
final_df.to_csv("karachi_merged_data_aqi.csv", index=False)
dataset_api.upload("karachi_merged_data_aqi.csv", "Resources", overwrite=True)

# Step 5: Convert numeric columns to float64
# Identify numeric columns excluding 'date' and 'date_str'
numeric_cols = final_df.select_dtypes(include='number').columns.tolist()
final_df[numeric_cols] = final_df[numeric_cols].astype('float64')


# Step 6: Convert 'date' column to Python date objects to match schema
final_df['date'] = pd.to_datetime(final_df['date']).dt.date


# Step 7: Define schema to match 'date' as a date (not timestamp)
feature_group_schema = []
for col in final_df.columns:
    if col == 'date':
        feature_group_schema.append(Feature(name=col, type="date"))  # <- fix here
    elif col == 'date_str':
         feature_group_schema.append(Feature(name=col, type="string"))
    elif pd.api.types.is_numeric_dtype(final_df[col]):
        feature_group_schema.append(Feature(name=col, type="double"))
    else:
        feature_group_schema.append(Feature(name=col, type="string"))


# Step 8: Get feature group and insert data with overwrite
feature_group_name = "karachi_raw_data_store"
feature_group_version = 1

try:
    fg = fs.get_feature_group(name=feature_group_name, version=feature_group_version)
    print("Using existing feature group")
except FeatureStoreException:
    print("Creating new feature group")
    # Create new feature group if it doesn't exist
    fg = fs.create_feature_group(
        name=feature_group_name,
        version=feature_group_version,
        description="Final features for Karachi AQI model (online + offline) - Overwritten",
        primary_key=["date_str"],
        event_time="date", # Use 'date' column as event time
        features=feature_group_schema,
        online_enabled=True
    )
except Exception as e:
    print(f"An unexpected error occurred: {e}")
    fg = None # Set fg to None if an unexpected error occurs

if fg is not None:
    try:
        # Insert data with overwrite=True
        fg.insert(final_df, write_options={"wait_for_job": True, "overwrite": True})
        print("Data inserted and overwritten successfully.")
        print(fg)
        print(fg._feature_group_engine.__class__.__name__)
    except Exception as e:
        print(f"Error inserting data into feature group: {e}")
else:
    print("Feature group operation failed. Please check your Hopsworks connection and parameters.")

Connection closed.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1238195


Uploading /content/karachi_merged_data_aqi.csv: 0.000%|          | 0/1052135 elapsed<00:00 remaining<?

Using existing feature group


Uploading Dataframe: 100.00% |██████████| Rows 4113/4113 | Elapsed Time: 00:05 | Remaining Time: 00:00


Launching job: karachi_raw_data_store_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1238195/jobs/named/karachi_raw_data_store_1_offline_fg_materialization/executions
Data inserted and overwritten successfully.
<hsfs.feature_group.FeatureGroup object at 0x7d39c5af1a90>
FeatureGroupEngine
