# Feature Backfill Pipeline: KP, Solar Wind and Weather

This notebook builds a reproducible feature backfill pipeline for geomagnetic activity modeling.
It ingests, cleans, validates, and stores historical features from multiple physical domains:

- Geomagnetic indices (KP)
- Solar wind and IMF parameters (OMNI / CDAS)
- Terrestrial weather data (Sweden)

Each data source is ingested independently using its natural temporal coverage and later aligned
through feature views. The output of this notebook is a set of versioned feature groups suitable
for training and inference workflows.

In [2]:
import sys
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('aurora',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH` 
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Root dir: C:\Users\lppap\Documents\master\scalable_ML\id2223-project
HopsworksSettings initialized!


## Imports

In [3]:
import datetime
import requests
import pandas as pd
import hopsworks
from mlfs.aurora import util
from cdasws import CdasWs
import datetime
from pathlib import Path
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

## Hopsworks login & Connect

In [4]:
project = hopsworks.login(engine="python")
fs = project.get_feature_store() 

2026-01-05 19:04:51,138 INFO: Initializing external client
2026-01-05 19:04:51,140 INFO: Base URL: https://c.app.hopsworks.ai:443






2026-01-05 19:04:53,204 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279154


## DB1 – KP Historical Data

This section ingests historical KP index data, which represents global geomagnetic activity.

KP is used as a core explanatory and/or target variable in geomagnetic and auroral models.
The data is treated as a daily-resolution, validated historical source suitable for training.

### Objective

Load, clean, and validate historical KP data and persist it as a standalone feature group.
This dataset defines the temporal backbone for downstream feature alignment.


### 1. Data Retrival

In [5]:
today = datetime.date.today() - datetime.timedelta(days=1)
yesteday = 

# Update csv file
util.update_kp_csv()

csv_file = f"{root_dir}/data/kpdata.csv"
util.check_file_path(csv_file)

kp_yesterday_df = util.get_kp(csv_file, yesterday)
kp_yesterday_df.head()

File successfully found at the path: C:\Users\lppap\Documents\master\scalable_ML\id2223-project/data/kpdata.csv


NameError: name 'yesterday' is not defined

In [5]:
df = pd.read_csv(csv_file, skipinitialspace=True)

# Construct date from YYYY, MM, DD
df["date"] = pd.to_datetime(
    dict(year=df.YYYY, month=df.MM, day=df.DD)
)
#df

### 2. Cleaning & Resampling

Ensure numeric types of the values that we find interesting

#### Temporal Coverage and Integrity

Before using the KP data, we explicitly measure its temporal coverage and detect missing days.
This avoids silent gaps that could propagate into model training or evaluation.


In [6]:
df_kp = df[
    [
        "date",
        "Kp1", "Kp2", "Kp3", "Kp4", "Kp5", "Kp6", "Kp7", "Kp8",
        "ap1", "ap2", "ap3", "ap4", "ap5", "ap6", "ap7", "ap8",
        "Ap"
    ]
].copy()

# Ensure numeric types
feature_cols = [c for c in df_kp.columns if c != "date"]
df_kp[feature_cols] = df_kp[feature_cols].astype("float32")

# drop NaN values
df_kp.dropna(inplace=True)

#df_kp

### 3. (KP) Aurora-specific validation suite for geomagnetic features

In [7]:
import great_expectations as ge

kp_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="geomagnetic_expectation_suite"
)

for kp_col in ["kp1", "kp2", "kp3", "kp4", "kp5", "kp6", "kp7", "kp8"]:
    kp_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_between",
            kwargs={
                "column": kp_col,
                "min_value": 0.0,
                "max_value": 9.0
            }
        )
    )

for ap_col in ["ap1", "ap2", "ap3", "ap4", "ap5", "ap6", "ap7", "ap8", "ap"]:
    kp_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_values_to_be_between",
            kwargs={
                "column": ap_col,
                "min_value": 0.0,
                "max_value": 400.0
            }
        )
    )

### 4. Feature Group Creation and Insertion
1. Define Feature Group
2. Insert Data
3. Add description

In [8]:
df_kp.columns = df_kp.columns.str.lower()

# Define Feature Group
geomagnetic_fg = fs.get_or_create_feature_group(
    name="geomagnetic_daily_final",
    description="Daily global geomagnetic activity indices (Kp, ap, Ap)",
    version=1,
    primary_key=["date"],
    event_time="date",
    expectation_suite=kp_expectation_suite
)

# 2. Insert Data
geomagnetic_fg.insert(df_kp, wait=True)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1279154/fs/1265765/fg/1893783
2026-01-05 10:09:52,187 INFO: 	17 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279154/fs/1265765/fg/1893783


Uploading Dataframe: 100.00% |███████████████████████████| Rows 2190/2190 | Elapsed Time: 00:02 | Remaining Time: 00:00


Launching job: geomagnetic_daily_final_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279154/jobs/named/geomagnetic_daily_final_1_offline_fg_materialization/executions
2026-01-05 10:10:14,848 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-05 10:10:18,068 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-05 10:10:21,257 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-05 10:12:32,319 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2026-01-05 10:12:32,485 INFO: Waiting for log aggregation to finish.
2026-01-05 10:12:41,137 INFO: Execution finished successfully.


(Job('geomagnetic_daily_final_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_values_to_be_between",
         "kwargs": {
           "column": "ap4",
           "min_value": 0.0,
           "max_value": 400.0
         },
         "meta": {
           "expectationId": 804896
         }
       },
       "result": {
         "element_count": 2190,
         "missing_count": 0,
         "missing_percent": 0.0,
         "unexpected_count": 0,
         "unexpected_percent": 0.0,
         "unexpected_percent_total": 0.0,
         "unexpected_percent_nonmissing": 0.0,
         "partial_unexpected_list": []
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2026-01-05T09:09:52.000186Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_trac

In [9]:
# 3. Add descriptions
geomagnetic_fg.update_feature_description(
    "date",
    "Date of global geomagnetic observation (daily resolution)"
)

geomagnetic_fg.update_feature_description(
    "kp1",
    "Geomagnetic Kp index for 00:00–03:00 UTC"
)
geomagnetic_fg.update_feature_description(
    "kp2",
    "Geomagnetic Kp index for 03:00–06:00 UTC"
)
geomagnetic_fg.update_feature_description(
    "kp3",
    "Geomagnetic Kp index for 06:00–09:00 UTC"
)
geomagnetic_fg.update_feature_description(
    "kp4",
    "Geomagnetic Kp index for 09:00–12:00 UTC"
)
geomagnetic_fg.update_feature_description(
    "kp5",
    "Geomagnetic Kp index for 12:00–15:00 UTC"
)
geomagnetic_fg.update_feature_description(
    "kp6",
    "Geomagnetic Kp index for 15:00–18:00 UTC"
)
geomagnetic_fg.update_feature_description(
    "kp7",
    "Geomagnetic Kp index for 18:00–21:00 UTC"
)
geomagnetic_fg.update_feature_description(
    "kp8",
    "Geomagnetic Kp index for 21:00–24:00 UTC"
)

geomagnetic_fg.update_feature_description(
    "ap1",
    "Linear ap geomagnetic index corresponding to kp1"
)
geomagnetic_fg.update_feature_description(
    "ap2",
    "Linear ap geomagnetic index corresponding to kp2"
)
geomagnetic_fg.update_feature_description(
    "ap3",
    "Linear ap geomagnetic index corresponding to kp3"
)
geomagnetic_fg.update_feature_description(
    "ap4",
    "Linear ap geomagnetic index corresponding to kp4"
)
geomagnetic_fg.update_feature_description(
    "ap5",
    "Linear ap geomagnetic index corresponding to kp5"
)
geomagnetic_fg.update_feature_description(
    "ap6",
    "Linear ap geomagnetic index corresponding to kp6"
)
geomagnetic_fg.update_feature_description(
    "ap7",
    "Linear ap geomagnetic index corresponding to kp7"
)
geomagnetic_fg.update_feature_description(
    "ap8",
    "Linear ap geomagnetic index corresponding to kp8"
)

geomagnetic_fg.update_feature_description(
    "ap",
    "Daily average linear geomagnetic ap index"
)

<hsfs.feature_group.FeatureGroup at 0x1bb5f38ee90>

## DB2 - Weather Data
This section ingests historical weather data for Sweden.

Weather variables provide terrestrial context that may influence visibility conditions,
measurement reliability, or secondary correlations with geomagnetic activity.

### Data Source and Scope

Weather data is retrieved for a fixed geographic location using a historical API.
The dataset is aggregated to daily resolution to match the temporal granularity
of KP and solar wind features.


### 1. Data Retrival

In [10]:
earliest_kp_date = df_kp["date"].min().strftime("%Y-%m-%d")
end_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")

latitude = 62.0
longitude = 15.0

weather_df = util.get_historical_weather_sweden(
    earliest_kp_date,
    end_date,
    latitude,
    longitude
)

weather_df.head()


Unnamed: 0,date,cloud_cover_mean,precipitation_sum,sunshine_duration
0,2020-01-01,92.333336,0.0,3612.911377
1,2020-01-02,96.625,0.0,0.0
2,2020-01-03,52.958332,0.0,7516.577148
3,2020-01-04,19.5,0.0,8072.493164
4,2020-01-05,77.0,1.0,0.0


In [11]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2196 entries, 0 to 2195
Data columns (total 4 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   date               2196 non-null   datetime64[ns]
 1   cloud_cover_mean   2196 non-null   float32       
 2   precipitation_sum  2196 non-null   float32       
 3   sunshine_duration  2196 non-null   float32       
dtypes: datetime64[ns](1), float32(3)
memory usage: 43.0 KB


### 3. Weather validation suite

In [12]:
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="sweden_weather_expectation_suite"
)

weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "cloud_cover_mean",
            "min_value": 0.0,
            "max_value": 100.0
        }
    )
)

weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "precipitation_sum",
            "min_value": 0.0
        }
    )
)

weather_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "sunshine_duration",
            "min_value": 0.0
        }
    )
)

{"expectation_type": "expect_column_values_to_be_between", "kwargs": {"column": "sunshine_duration", "min_value": 0.0}, "meta": {}}

### 4. Feature Group Creation and Insertion

In [15]:
weather_fg = fs.get_or_create_feature_group(
    name="sweden_weather_daily_final",
    description="Daily weather conditions in Sweden relevant for aurora observability",
    version=1,
    primary_key=["date"],
    event_time="date",
    expectation_suite=weather_expectation_suite
)

weather_fg.insert(weather_df, wait=True)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1279154/fs/1265765/fg/1890740
2026-01-05 10:16:59,060 INFO: 	3 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279154/fs/1265765/fg/1890740


Uploading Dataframe: 100.00% |███████████████████████████| Rows 2196/2196 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: sweden_weather_daily_final_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279154/jobs/named/sweden_weather_daily_final_1_offline_fg_materialization/executions
2026-01-05 10:17:17,557 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2026-01-05 10:17:20,722 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2026-01-05 10:17:23,903 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2026-01-05 10:19:18,364 INFO: Waiting for log aggregation to finish.
2026-01-05 10:19:26,968 INFO: Execution finished successfully.


(Job('sweden_weather_daily_final_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_values_to_be_between",
         "kwargs": {
           "column": "sunshine_duration",
           "min_value": 0.0
         },
         "meta": {
           "expectationId": 803879
         }
       },
       "result": {
         "element_count": 2196,
         "missing_count": 0,
         "missing_percent": 0.0,
         "unexpected_count": 0,
         "unexpected_percent": 0.0,
         "unexpected_percent_total": 0.0,
         "unexpected_percent_nonmissing": 0.0,
         "partial_unexpected_list": []
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2026-01-05T09:16:59.000060Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
 

In [16]:
weather_fg.update_feature_description(
    "date",
    "Date of daily weather observation for Sweden"
)

weather_fg.update_feature_description(
    "cloud_cover_mean",
    "Mean daily cloud cover in percent (0–100), affecting aurora visibility"
)

weather_fg.update_feature_description(
    "precipitation_sum",
    "Total daily precipitation in millimeters (rain or snow)"
)

weather_fg.update_feature_description(
    "sunshine_duration",
    "Total duration of sunshine during the day in seconds, proxy for sky clarity"
)

<hsfs.feature_group.FeatureGroup at 0x1bb5f579420>