# Storing real-time data to raw_observations feature group


## Manual testing - Automated

In [None]:
!pip install hopsworks==4.2.*

In [None]:
!pip install confluent-kafka

In [None]:
import hopsworks
from google.colab import userdata

HOPSWORKS_API_KEY = userdata.get('HOPSWORKS_API_KEY')

print(f'API key loaded')


In [None]:
# ✅ Connect to your Hopsworks project
project = hopsworks.login(api_key_value=HOPSWORKS_API_KEY)
fs = project.get_feature_store()
print("✅ Connected to Hopsworks project successfully!")


In [None]:
import pandas as pd

def push_to_hopsworks(
    df: pd.DataFrame,
    fg_name: str,
    version: int,
    primary_key: list,
    event_time: str,
    description: str,
    online_enabled: bool = False
):
    """
    Create or update a Hopsworks Feature Group and insert data.
    """

    # Ensure datetime formatting
    df[event_time] = pd.to_datetime(df[event_time], errors="coerce", utc=True)
    df = df.dropna(subset=[event_time])
    df = df.reset_index(drop=True)

    # Create or get feature group
    fg = fs.get_or_create_feature_group(
        name=fg_name,
        version=version,
        primary_key=primary_key,
        event_time=event_time,
        description=description,
        online_enabled=online_enabled
    )

    # Insert data
    fg.insert(df, write_options={"wait_for_job": True})
    print(f"✅ Successfully pushed {len(df)} records to feature group: '{fg_name}' (v{version})")


# Loaded realtime_aqi_weather.csv to upload on Hopsworks (Real-time data)

In [None]:
from google.colab import files

uploaded = files.upload()
print("Data loaded successfully!")

In [None]:
realtime_path = "realtime_aqi_weather.csv"

df_features = pd.read_csv(realtime_path)
print(f"✅ Loaded {len(df_features)} real-time records from {realtime_path}")
print("Columns:", len(df_features.columns))

In [None]:
df_features.rename(columns={"pm25": "pm2_5"}, inplace=True)

In [None]:
df_features["datetime"] = pd.to_datetime(df_features["datetime"], utc=True)
df_features["datetime_str"] = df_features["datetime"].astype(str)  # string key

In [None]:
push_to_hopsworks(
    df=df_features,
    fg_name="raw_observations",
    version=2,
    primary_key=["datetime_str"],
    event_time="datetime",
    description="Real-time AQI + Weather features (Karachi, hourly)",
    online_enabled=True,
)
