### Imports


In [1]:
import io
import requests
import pandas as pd
import numpy as np
import hopsworks
import os
import warnings
from dotenv import load_dotenv

load_dotenv()

api_key = os.getenv("HOPSWORKS_API_KEY")

project = hopsworks.login(host="eu-west.cloud.hopsworks.ai",
project="EarthTamagotchi", api_key_value=api_key)

fs = project.get_feature_store()

warnings.filterwarnings("ignore")


2025-12-22 14:33:27,881 INFO: Initializing external client
2025-12-22 14:33:27,882 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2025-12-22 14:33:29,147 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/2177


## Get references to the Feature Groups


In [2]:
# Retrieve feature groups (same versions as in backfill notebook)
co2_fg = fs.get_feature_group(
    name='global_co2',
    version=1,
)
temp_fg = fs.get_feature_group(
    name='global_temperature',
    version=1,
)


## üå´ Retrieve Latest CO‚ÇÇ Data from NOAA GML


In [3]:
NOAA_CO2_URL = "https://gml.noaa.gov/webdata/ccgg/trends/co2/co2_mm_mlo.txt"

response = requests.get(NOAA_CO2_URL)
response.raise_for_status()

# NOAA file has commented header lines starting with '#'
lines = response.text.splitlines()
data_lines = [ln for ln in lines if ln.strip() and not ln.startswith("#")]

raw_text = "\n".join(data_lines)

# Columns in Mauna Loa file (see header in the NOAA text):
# year, month, decimal_date, average, trend, #days, st.dev, unc. of mon mean
co2_df = pd.read_csv(
    io.StringIO(raw_text),
    delim_whitespace=True,
    header=None,
    names=["year", "month", "decimal_date", "average", "trend", "ndays", "stdev", "average_unc"],
)

# Build a proper datetime (first day of each month)
co2_df["date"] = pd.to_datetime(
    {
        "year": co2_df["year"].astype(int),
        "month": co2_df["month"].astype(int),
        "day": 1,
    }
)

# Replace NOAA missing value marker (-99.99 or -9.99) with NaN
for col in ["average", "trend", "average_unc"]:
    co2_df[col] = co2_df[col].replace([-99.99, -9.99], pd.NA).astype("float32")

co2_df = co2_df.dropna(subset=["average"]).copy()

# Keep a tidy subset of columns we care about
co2_df = co2_df[["date", "average", "trend", "average_unc"]].sort_values("date").reset_index(drop=True)

# Get the latest date from the feature store to find new data
# We'll reuse this historical data later for calculating rolling features
historical_co2_df = co2_fg.read()
if len(historical_co2_df) > 0:
    historical_co2_df['date'] = pd.to_datetime(historical_co2_df['date'])
    # Remove timezone info if present (normalize to timezone-naive)
    if historical_co2_df['date'].dt.tz is not None:
        # Convert to UTC first, then remove timezone
        historical_co2_df['date'] = historical_co2_df['date'].dt.tz_convert('UTC').dt.tz_localize(None)
    latest_date = historical_co2_df['date'].max()
    print(f"Latest date in feature store: {latest_date.date()}")
    
    # Filter to only new data (after latest date)
    co2_df['date'] = pd.to_datetime(co2_df['date'])
    # Ensure timezone-naive for comparison
    if co2_df['date'].dt.tz is not None:
        co2_df['date'] = co2_df['date'].dt.tz_convert('UTC').dt.tz_localize(None)
    co2_new_df = co2_df[co2_df['date'] > latest_date].copy()
    
    if len(co2_new_df) == 0:
        print("No new CO‚ÇÇ data to add")
        co2_new_df = pd.DataFrame()  # Empty DataFrame
    else:
        print(f"Found {len(co2_new_df)} new month(s) of CO‚ÇÇ data")
else:
    print("No historical data found in feature store - this should not happen if backfill ran first")
    co2_new_df = pd.DataFrame()  # Empty DataFrame
    historical_co2_df = pd.DataFrame()  # Empty DataFrame for consistency

co2_new_df


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.26s) 
Latest date in feature store: 2025-11-01
No new CO‚ÇÇ data to add


In [4]:
if len(co2_new_df) > 0:
    # Get historical data to calculate lags and rolling means
    # We need enough history for the maximum lag (12 months)
    historical_co2_df = co2_fg.read()
    historical_co2_df['date'] = pd.to_datetime(historical_co2_df['date'])
    # Remove timezone info if present (normalize to timezone-naive)
    if historical_co2_df['date'].dt.tz is not None:
        historical_co2_df['date'] = pd.to_datetime(historical_co2_df['date'].dt.tz_convert('UTC'), utc=False)
    historical_co2_df = historical_co2_df.sort_values('date').reset_index(drop=True)
    
    # Ensure new data dates are also timezone-naive
    co2_new_df['date'] = pd.to_datetime(co2_new_df['date'])
    if co2_new_df['date'].dt.tz is not None:
        co2_new_df['date'] = pd.to_datetime(co2_new_df['date'].dt.tz_convert('UTC'), utc=False)
    
    # Combine historical and new data for feature engineering
    combined_co2_df = pd.concat([historical_co2_df[['date', 'average', 'trend', 'average_unc']], 
                                  co2_new_df[['date', 'average', 'trend', 'average_unc']]], 
                                 ignore_index=True)
    combined_co2_df = combined_co2_df.sort_values('date').reset_index(drop=True)
    
    # Add lag features for the main target series (average CO‚ÇÇ) and for the trend
    for k in [1, 2, 3, 6, 12]:
        combined_co2_df[f"average_lag_{k}"] = combined_co2_df["average"].shift(k)
        combined_co2_df[f"trend_lag_{k}"] = combined_co2_df["trend"].shift(k)
    
    # Add rolling means over the average and trend series
    # IMPORTANT: shift by 1 so rolling windows use only *past* months (no leakage of current month)
    shifted_avg = combined_co2_df["average"].shift(1)
    shifted_trend = combined_co2_df["trend"].shift(1)
    combined_co2_df["average_roll_3"] = shifted_avg.rolling(window=3).mean()
    combined_co2_df["average_roll_12"] = shifted_avg.rolling(window=12).mean()
    combined_co2_df["trend_roll_3"] = shifted_trend.rolling(window=3).mean()
    combined_co2_df["trend_roll_12"] = shifted_trend.rolling(window=12).mean()
    
    # Extract only the new rows (with all features calculated)
    co2_new_df = combined_co2_df[combined_co2_df['date'].isin(co2_new_df['date'])].copy()
    
    # Drop rows that don't have full history for all lags/rolls
    co2_new_df = co2_new_df.dropna().reset_index(drop=True)
    
    # Add time-based features for trend modeling
    # Use year_min/year_max from historical data for consistent normalization
    co2_new_df['year'] = pd.to_datetime(co2_new_df['date']).dt.year
    co2_new_df['month'] = pd.to_datetime(co2_new_df['date']).dt.month
    # Create cyclical month features (sin/cos for seasonal patterns)
    co2_new_df['month_sin'] = np.sin(2 * np.pi * co2_new_df['month'] / 12)
    co2_new_df['month_cos'] = np.cos(2 * np.pi * co2_new_df['month'] / 12)
    # Normalize year using the same range as historical data (from backfill)
    # Get year range from historical data to ensure consistent normalization
    year_min = historical_co2_df['year'].min() if 'year' in historical_co2_df.columns else co2_new_df['year'].min()
    year_max = historical_co2_df['year'].max() if 'year' in historical_co2_df.columns else co2_new_df['year'].max()
    # If historical data doesn't have year column, calculate from date
    if 'year' not in historical_co2_df.columns:
        historical_co2_df['year'] = pd.to_datetime(historical_co2_df['date']).dt.year
        year_min = historical_co2_df['year'].min()
        year_max = historical_co2_df['year'].max()
    co2_new_df['year_normalized'] = (co2_new_df['year'] - year_min) / (year_max - year_min)
    # Add polynomial year term to capture acceleration
    co2_new_df['year_normalized_squared'] = co2_new_df['year_normalized'] ** 2
    # Add interaction terms: year * seasonality
    co2_new_df['year_month_sin'] = co2_new_df['year_normalized'] * co2_new_df['month_sin']
    co2_new_df['year_month_cos'] = co2_new_df['year_normalized'] * co2_new_df['month_cos']
    
    print(f"Prepared {len(co2_new_df)} new CO‚ÇÇ record(s) with features")
    co2_new_df.head()
else:
    print("No new CO‚ÇÇ data to process")


No new CO‚ÇÇ data to process


##  üå°Ô∏è Retrieve Latest Temperature Data from NASA GISS


In [5]:
# Fetch NASA GISTEMP global land‚Äìocean monthly temperature anomalies (GLB.Ts+dSST)
GISTEMP_URL = "https://data.giss.nasa.gov/gistemp/tabledata_v4/GLB.Ts+dSST.csv"

response = requests.get(GISTEMP_URL)
response.raise_for_status()

# Read CSV, skipping the first descriptive line so the header row is used
wide_df = pd.read_csv(io.StringIO(response.text), skiprows=1)
# Strip any whitespace from column names
wide_df.columns = [c.strip() for c in wide_df.columns]

# Expected monthly columns in GISTEMP table
month_cols = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
wide_df = wide_df[["Year"] + month_cols]

# Reshape to long format: one row per (year, month)
long_df = wide_df.melt(id_vars="Year", value_vars=month_cols,
                       var_name="month", value_name="temp_anomaly")

# Drop missing values (marked as *** in original file)
long_df = long_df.replace("***", pd.NA).dropna(subset=["temp_anomaly"]).copy()

# Map month names to month numbers
month_map = {"Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
             "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12}
long_df["Year"] = long_df["Year"].astype(int)
long_df["month_num"] = long_df["month"].map(month_map)

# Build a proper datetime (first day of each month)
long_df["date"] = pd.to_datetime({
    "year": long_df["Year"],
    "month": long_df["month_num"],
    "day": 1,
})

# Convert anomaly to float (values are in ¬∞C anomalies)
long_df["temp_anomaly"] = long_df["temp_anomaly"].astype("float32")

# Final tidy DataFrame
temp_df = long_df[["date", "temp_anomaly"]].sort_values("date").reset_index(drop=True)

# Get the latest date from the feature store to find new data
historical_temp_df = temp_fg.read()
if len(historical_temp_df) > 0:
    historical_temp_df['date'] = pd.to_datetime(historical_temp_df['date'])
    # Remove timezone info if present (normalize to timezone-naive)
    if historical_temp_df['date'].dt.tz is not None:
        historical_temp_df['date'] = pd.to_datetime(historical_temp_df['date'].dt.tz_convert('UTC'), utc=False)
    latest_date = historical_temp_df['date'].max()
    print(f"Latest date in feature store: {latest_date.date()}")
    
    # Filter to only new data (after latest date)
    temp_df['date'] = pd.to_datetime(temp_df['date'])
    # Ensure timezone-naive for comparison
    if temp_df['date'].dt.tz is not None:
        temp_df['date'] = pd.to_datetime(temp_df['date'].dt.tz_convert('UTC'), utc=False)
    temp_new_df = temp_df[temp_df['date'] > latest_date].copy()
    
    if len(temp_new_df) == 0:
        print("No new temperature data to add")
        temp_new_df = pd.DataFrame()  # Empty DataFrame
    else:
        print(f"Found {len(temp_new_df)} new month(s) of temperature data")
else:
    print("No historical data found in feature store - this should not happen if backfill ran first")
    temp_new_df = pd.DataFrame()  # Empty DataFrame

temp_new_df


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.35s) 
Latest date in feature store: 2025-11-01
No new temperature data to add


In [6]:
if len(temp_new_df) > 0:
    # Get historical data to calculate lags and rolling means
    # We need enough history for the maximum lag (12 months)
    historical_temp_df = temp_fg.read()
    historical_temp_df['date'] = pd.to_datetime(historical_temp_df['date'])
    # Remove timezone info if present (normalize to timezone-naive)
    if historical_temp_df['date'].dt.tz is not None:
        historical_temp_df['date'] = pd.to_datetime(historical_temp_df['date'].dt.tz_convert('UTC'), utc=False)
    historical_temp_df = historical_temp_df.sort_values('date').reset_index(drop=True)
    
    # Ensure new data dates are also timezone-naive
    temp_new_df['date'] = pd.to_datetime(temp_new_df['date'])
    if temp_new_df['date'].dt.tz is not None:
        temp_new_df['date'] = pd.to_datetime(temp_new_df['date'].dt.tz_convert('UTC'), utc=False)
    
    # Combine historical and new data for feature engineering
    combined_temp_df = pd.concat([historical_temp_df[['date', 'temp_anomaly']], 
                                   temp_new_df[['date', 'temp_anomaly']]], 
                                  ignore_index=True)
    combined_temp_df = combined_temp_df.sort_values('date').reset_index(drop=True)
    
    # Add lag features for the main target series (temperature anomaly)
    for k in [1, 2, 3, 6, 12]:
        combined_temp_df[f"temp_anomaly_lag_{k}"] = combined_temp_df["temp_anomaly"].shift(k)
    
    # Add rolling means over the temperature anomaly series
    # IMPORTANT: shift by 1 so the rolling window uses only *past* months (no leakage of current month)
    shifted_temp = combined_temp_df["temp_anomaly"].shift(1)
    combined_temp_df["temp_anomaly_roll_3"] = shifted_temp.rolling(window=3).mean()
    combined_temp_df["temp_anomaly_roll_12"] = shifted_temp.rolling(window=12).mean()
    
    # Add time-based features for temperature trend modeling (consistent with backfill)
    combined_temp_df['year'] = pd.to_datetime(combined_temp_df['date']).dt.year
    combined_temp_df['month'] = pd.to_datetime(combined_temp_df['date']).dt.month
    combined_temp_df['month_sin'] = np.sin(2 * np.pi * combined_temp_df['month'] / 12)
    combined_temp_df['month_cos'] = np.cos(2 * np.pi * combined_temp_df['month'] / 12)
    
    # Normalize year using the same range as historical data to keep trend consistent
    if 'year' in historical_temp_df.columns:
        year_min_temp = historical_temp_df['year'].min()
        year_max_temp = historical_temp_df['year'].max()
    else:
        historical_temp_df['year'] = pd.to_datetime(historical_temp_df['date']).dt.year
        year_min_temp = historical_temp_df['year'].min()
        year_max_temp = historical_temp_df['year'].max()
    combined_temp_df['year_normalized'] = (combined_temp_df['year'] - year_min_temp) / (year_max_temp - year_min_temp)
    combined_temp_df['year_normalized_squared'] = combined_temp_df['year_normalized'] ** 2
    combined_temp_df['year_month_sin'] = combined_temp_df['year_normalized'] * combined_temp_df['month_sin']
    combined_temp_df['year_month_cos'] = combined_temp_df['year_normalized'] * combined_temp_df['month_cos']
    
    # Extract only the new rows (with all features calculated)
    temp_new_df = combined_temp_df[combined_temp_df['date'].isin(temp_new_df['date'])].copy()
    
    # Drop rows that don't have full history for all lags/rolls
    temp_new_df = temp_new_df.dropna().reset_index(drop=True)
    
    print(f"Prepared {len(temp_new_df)} new temperature record(s) with features")
    temp_new_df.head()
else:
    print("No new temperature data to process")


No new temperature data to process


## Uploading new data to the Feature Store


In [7]:
# Insert new CO‚ÇÇ data if available
if len(co2_new_df) > 0:
    # Skip validation report saving to avoid Hopsworks server errors (500 error when saving report)
    co2_fg.insert(co2_new_df, validation_options={"save_report": False})
    print(f"Inserted {len(co2_new_df)} new CO‚ÇÇ record(s)")
else:
    print("No new CO‚ÇÇ data to insert")


No new CO‚ÇÇ data to insert


In [8]:
# Insert new temperature data if available
if len(temp_new_df) > 0:
    # Skip validation report saving to avoid Hopsworks server errors (500 error when saving report)
    temp_fg.insert(temp_new_df, wait=True, validation_options={"save_report": False})
    print(f"Inserted {len(temp_new_df)} new temperature record(s)")
else:
    print("No new temperature data to insert")


No new temperature data to insert
