# Aurora Forecasting - Part 02: Daily Feature Pipeline

üóíÔ∏è This notebook is divided into the following sections:
Initialize Hopsworks connection.

Fetch the latest real-time Solar Wind data from NOAA.

Fetch the latest Cloud Cover forecast for Stockholm, Lule√•, and Kiruna.

Update the Feature Groups in the Hopsworks Feature Store.

# Imports and Login

In [10]:
import pandas as pd
import datetime
import hopsworks
from config import HopsworksSettings
import util
import warnings
warnings.filterwarnings("ignore")
import numpy

# Setup settings
settings = HopsworksSettings()

print(settings.HOPSWORKS_PROJECT)

# Login to Hopsworks
project = hopsworks.login(
    project=settings.HOPSWORKS_PROJECT,
    api_key_value=settings.HOPSWORKS_API_KEY.get_secret_value()
)
fs = project.get_feature_store()

HopsworksSettings initialized!
mac64
2026-01-11 03:16:01,182 INFO: Closing external client and cleaning up certificates.
Connection closed.
2026-01-11 03:16:01,194 INFO: Initializing external client
2026-01-11 03:16:01,194 INFO: Base URL: https://c.app.hopsworks.ai:443






2026-01-11 03:16:03,043 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1299605


# Step 1: Get Real-time Solar Wind Data

We use the NOAA SWPC API to get the most recent measurements from the DSCOVR/ACE satellites. These will serve as the features for our real-time inference.

In [11]:
print("Fetching real-time solar wind data from NOAA...")

# Uses the helper function from util.py to fetch and merge mag/plasma data
new_solar_df = util.get_noaa_realtime_hourly_data(
    settings.NOAA_MAG_URL,
    settings.NOAA_PLASMA_URL,
    settings.KP_INDEX_URL
)

# Format the time_tag for Hopsworks compatibility
#new_solar_df['time'] = new_solar_df['time'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Drop unecessary columns if any (spoiler, there are)
new_solar_df.drop(columns=['bx_gsm', 'lon_gsm', 'lat_gsm', 'bt', 'temperature', 'a_running', 'station_count'], inplace=True, errors='ignore')

print(f"Successfully retrieved {len(new_solar_df)} new solar wind records.")
new_solar_df

Fetching real-time solar wind data from NOAA...
Raw Magnetometer data:
     bx_gsm  by_gsm  bz_gsm  lon_gsm  lat_gsm     bt             date_and_time
0     2.26    6.22   -3.80    70.07   -29.85   7.63 2026-01-10 14:00:00+00:00
1    -1.07   -7.57   -3.31   261.94   -23.43   8.35 2026-01-10 15:00:00+00:00
2     2.81   -3.96   -5.97   305.34   -50.87   7.71 2026-01-10 16:00:00+00:00
3     3.51    1.18   -6.57    18.66   -60.59   7.73 2026-01-10 17:00:00+00:00
4     6.02    5.79   -4.84    43.88   -30.05   9.70 2026-01-10 18:00:00+00:00
5     5.30    0.20   -6.61     2.15   -51.23   8.48 2026-01-10 19:00:00+00:00
6     0.20   17.16    0.08    89.34     0.27  17.42 2026-01-10 20:00:00+00:00
7    -1.66   -9.76  -14.12   260.34   -54.98  18.28 2026-01-10 21:00:00+00:00
8    -0.46  -13.36   -9.56   268.03   -35.56  16.46 2026-01-10 22:00:00+00:00
9     9.83   -6.68    8.87   325.82    36.75  14.88 2026-01-11 00:00:00+00:00
10   10.13  -13.01   -0.79   307.92    -2.75  16.52 2026-01-11 01:00:0

Unnamed: 0,by_gsm,bz_gsm,date_and_time,density,speed,kp_index
0,6.22,-3.8,2026-01-10 14:00:00+00:00,8.67,488.2,
1,-7.57,-3.31,2026-01-10 15:00:00+00:00,7.22,483.0,3.33
2,-3.96,-5.97,2026-01-10 16:00:00+00:00,8.59,465.1,
3,1.18,-6.57,2026-01-10 17:00:00+00:00,7.51,468.6,
4,5.79,-4.84,2026-01-10 18:00:00+00:00,6.66,466.9,6.0
5,0.2,-6.61,2026-01-10 19:00:00+00:00,6.93,459.7,
6,17.16,0.08,2026-01-10 20:00:00+00:00,21.63,581.4,
7,-9.76,-14.12,2026-01-10 21:00:00+00:00,17.58,577.8,5.67
8,-13.36,-9.56,2026-01-10 22:00:00+00:00,22.04,556.9,
9,-6.68,8.87,2026-01-11 00:00:00+00:00,13.76,521.2,


In [12]:
# Add dynamic pressure calculation
new_solar_df = util.calculate_dynamic_pressure(new_solar_df)

In [13]:
# Aggregate the new solar wind data into 3-hour intervals
new_solar_aggregated_df = util.aggregate_solar_wind_3h(new_solar_df)
new_solar_aggregated_df

Unnamed: 0,window_start,window_end,by_gsm_mean,by_gsm_min,by_gsm_max,by_gsm_std,bz_gsm_mean,bz_gsm_min,bz_gsm_max,bz_gsm_std,...,density_std,speed_mean,speed_min,speed_max,speed_std,dynamic_pressure_mean,dynamic_pressure_min,dynamic_pressure_max,dynamic_pressure_std,kp_index
0,2026-01-10 15:00:00+00:00,2026-01-10 18:00:00+00:00,-3.45,-7.57,1.18,4.397238,-5.283333,-6.57,-3.31,1.735089,...,0.721965,472.233333,465.1,483.0,9.487009,1730536.375,1649090.5,1858171.75,111932.4,3.33
1,2026-01-10 18:00:00+00:00,2026-01-10 21:00:00+00:00,7.716667,0.2,17.16,8.642594,-3.79,-6.61,0.08,3.466396,...,8.566055,502.666667,459.7,581.4,68.280036,3409276.0,1451850.75,7311501.5,3379432.0,6.0


In [14]:
# Filter out rows with missing values and sort by date_and_time
new_solar_aggregated_df = new_solar_aggregated_df.dropna()
new_solar_aggregated_df = new_solar_aggregated_df.sort_values(["window_start"])
new_solar_aggregated_df = new_solar_aggregated_df.reset_index(drop=True)

new_solar_aggregated_df

Unnamed: 0,window_start,window_end,by_gsm_mean,by_gsm_min,by_gsm_max,by_gsm_std,bz_gsm_mean,bz_gsm_min,bz_gsm_max,bz_gsm_std,...,density_std,speed_mean,speed_min,speed_max,speed_std,dynamic_pressure_mean,dynamic_pressure_min,dynamic_pressure_max,dynamic_pressure_std,kp_index
0,2026-01-10 15:00:00+00:00,2026-01-10 18:00:00+00:00,-3.45,-7.57,1.18,4.397238,-5.283333,-6.57,-3.31,1.735089,...,0.721965,472.233333,465.1,483.0,9.487009,1730536.375,1649090.5,1858171.75,111932.4,3.33
1,2026-01-10 18:00:00+00:00,2026-01-10 21:00:00+00:00,7.716667,0.2,17.16,8.642594,-3.79,-6.61,0.08,3.466396,...,8.566055,502.666667,459.7,581.4,68.280036,3409276.0,1451850.75,7311501.5,3379432.0,6.0


In [15]:
# Drop the column of the KP index, because it is not useful for the inference in the real time data
new_solar_df = new_solar_df.drop(columns=['kp_index'])
new_solar_df.dropna(inplace=True)
new_solar_df = new_solar_df.sort_values(["date_and_time"])
new_solar_df = new_solar_df.reset_index(drop=True)
new_solar_df

Unnamed: 0,by_gsm,bz_gsm,date_and_time,density,speed,dynamic_pressure
0,6.22,-3.8,2026-01-10 14:00:00+00:00,8.67,488.2,2066401.25
1,-7.57,-3.31,2026-01-10 15:00:00+00:00,7.22,483.0,1684346.625
2,-3.96,-5.97,2026-01-10 16:00:00+00:00,8.59,465.1,1858171.75
3,1.18,-6.57,2026-01-10 17:00:00+00:00,7.51,468.6,1649090.5
4,5.79,-4.84,2026-01-10 18:00:00+00:00,6.66,466.9,1451850.75
5,0.2,-6.61,2026-01-10 19:00:00+00:00,6.93,459.7,1464476.0
6,17.16,0.08,2026-01-10 20:00:00+00:00,21.63,581.4,7311501.5
7,-9.76,-14.12,2026-01-10 21:00:00+00:00,17.58,577.8,5869133.0
8,-13.36,-9.56,2026-01-10 22:00:00+00:00,22.04,556.9,6835433.0
9,-6.68,8.87,2026-01-11 00:00:00+00:00,13.76,521.2,3737896.25


# Step 3: Insert into Feature Groups

Now we push the new observations into the Feature Store. Hopsworks will handle the deduplication based on the primary keys defined in the backfill notebook.

In [16]:
print("Before casting the aggregated data:\n", new_solar_aggregated_df)
# Clean and cast to correct types for Feature Store compatibility
# Convert numeric columns to float32 (Feature Store expects 'float' not 'double')
df = new_solar_aggregated_df.copy()

for col in df.columns:
    if col not in ["window_start", "window_end"]:
        df[col] = pd.to_numeric(df[col], errors='coerce').astype('float32')

new_solar_aggregated_df = df
# check data types of each column
print("After casting:\n", new_solar_aggregated_df.dtypes)
new_solar_aggregated_df

Before casting the aggregated data:
                window_start                window_end  by_gsm_mean  \
0 2026-01-10 15:00:00+00:00 2026-01-10 18:00:00+00:00    -3.450000   
1 2026-01-10 18:00:00+00:00 2026-01-10 21:00:00+00:00     7.716667   

   by_gsm_min  by_gsm_max  by_gsm_std  bz_gsm_mean  bz_gsm_min  bz_gsm_max  \
0       -7.57        1.18    4.397238    -5.283333       -6.57       -3.31   
1        0.20       17.16    8.642594    -3.790000       -6.61        0.08   

   bz_gsm_std  ...  density_std  speed_mean  speed_min  speed_max  speed_std  \
0    1.735089  ...     0.721965  472.233333      465.1      483.0   9.487009   
1    3.466396  ...     8.566055  502.666667      459.7      581.4  68.280036   

   dynamic_pressure_mean  dynamic_pressure_min  dynamic_pressure_max  \
0            1730536.375            1649090.50            1858171.75   
1            3409276.000            1451850.75            7311501.50   

   dynamic_pressure_std  kp_index  
0          1.119324e+05

Unnamed: 0,window_start,window_end,by_gsm_mean,by_gsm_min,by_gsm_max,by_gsm_std,bz_gsm_mean,bz_gsm_min,bz_gsm_max,bz_gsm_std,...,density_std,speed_mean,speed_min,speed_max,speed_std,dynamic_pressure_mean,dynamic_pressure_min,dynamic_pressure_max,dynamic_pressure_std,kp_index
0,2026-01-10 15:00:00+00:00,2026-01-10 18:00:00+00:00,-3.45,-7.57,1.18,4.397238,-5.283333,-6.57,-3.31,1.735089,...,0.721965,472.233337,465.100006,483.0,9.487009,1730536.375,1649090.5,1858171.75,111932.375,3.33
1,2026-01-10 18:00:00+00:00,2026-01-10 21:00:00+00:00,7.716667,0.2,17.16,8.642594,-3.79,-6.61,0.08,3.466396,...,8.566055,502.666656,459.700012,581.400024,68.280037,3409276.0,1451850.75,7311501.5,3379432.25,6.0


In [17]:
print("Before casting the real time data:\n", new_solar_df)
# Clean and cast to correct types for Feature Store compatibility
# Convert numeric columns to float32 (Feature Store expects 'float' not 'double')
df = new_solar_df.copy()

for col in df.columns:
    if col not in ["date_and_time"]:
        df[col] = pd.to_numeric(df[col], errors='coerce').astype('float32')

new_solar_df = df
# check data types of each column
print("After casting:\n", new_solar_df.dtypes)
new_solar_df

Before casting the real time data:
     by_gsm  bz_gsm             date_and_time  density  speed  dynamic_pressure
0     6.22   -3.80 2026-01-10 14:00:00+00:00     8.67  488.2       2066401.250
1    -7.57   -3.31 2026-01-10 15:00:00+00:00     7.22  483.0       1684346.625
2    -3.96   -5.97 2026-01-10 16:00:00+00:00     8.59  465.1       1858171.750
3     1.18   -6.57 2026-01-10 17:00:00+00:00     7.51  468.6       1649090.500
4     5.79   -4.84 2026-01-10 18:00:00+00:00     6.66  466.9       1451850.750
5     0.20   -6.61 2026-01-10 19:00:00+00:00     6.93  459.7       1464476.000
6    17.16    0.08 2026-01-10 20:00:00+00:00    21.63  581.4       7311501.500
7    -9.76  -14.12 2026-01-10 21:00:00+00:00    17.58  577.8       5869133.000
8   -13.36   -9.56 2026-01-10 22:00:00+00:00    22.04  556.9       6835433.000
9    -6.68    8.87 2026-01-11 00:00:00+00:00    13.76  521.2       3737896.250
10  -13.01   -0.79 2026-01-11 01:00:00+00:00     4.65  530.0       1306185.000
After casting:
 

Unnamed: 0,by_gsm,bz_gsm,date_and_time,density,speed,dynamic_pressure
0,6.22,-3.8,2026-01-10 14:00:00+00:00,8.67,488.200012,2066401.25
1,-7.57,-3.31,2026-01-10 15:00:00+00:00,7.22,483.0,1684346.625
2,-3.96,-5.97,2026-01-10 16:00:00+00:00,8.59,465.100006,1858171.75
3,1.18,-6.57,2026-01-10 17:00:00+00:00,7.51,468.600006,1649090.5
4,5.79,-4.84,2026-01-10 18:00:00+00:00,6.66,466.899994,1451850.75
5,0.2,-6.61,2026-01-10 19:00:00+00:00,6.93,459.700012,1464476.0
6,17.16,0.08,2026-01-10 20:00:00+00:00,21.629999,581.400024,7311501.5
7,-9.76,-14.12,2026-01-10 21:00:00+00:00,17.58,577.799988,5869133.0
8,-13.36,-9.56,2026-01-10 22:00:00+00:00,22.040001,556.900024,6835433.0
9,-6.68,8.87,2026-01-11 00:00:00+00:00,13.76,521.200012,3737896.25


In [18]:
# Retrieve references to the Feature Groups
solar_wind_fg = fs.get_feature_group(name="solar_wind_fg", version=8)
solar_wind_aggregated_fg = fs.get_feature_group(name="solar_wind_aggregated_fg", version=3)

# Insert new data
# Note: For real-time pipelines, we often use online_enabled=True
# so the data is available for immediate inference.
solar_wind_fg.insert(new_solar_df)
solar_wind_aggregated_fg.insert(new_solar_aggregated_df)

print("Daily Feature Pipeline execution complete!")

Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 11/11 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: solar_wind_fg_8_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1299605/jobs/named/solar_wind_fg_8_offline_fg_materialization/executions


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 2/2 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: solar_wind_aggregated_fg_3_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1299605/jobs/named/solar_wind_aggregated_fg_3_offline_fg_materialization/executions
Daily Feature Pipeline execution complete!
