In [30]:
import sys
from pathlib import Path
#from mlfs import config

#Find root directory
root_dir = Path().absolute()
if root_dir.parts[-2:] == ('notebooks', 'algae_bloom'):
    root_dir = Path(*root_dir.parts[:-2])
root_dir = str(root_dir)

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH` 
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")


Root dir: /Users/kevinkokalari/Documents/Skalbar MaskininlaÃàrning och DjupinlaÃàrning/Laborationer/id2223-project


<span style="font-width:bold; font-size: 3rem; color:#333;">- Daily Feature Pipeline for Water Temperature and Weather</span>

## üåê Imports

In [31]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
from mlfs import config
import json
import os
import warnings
warnings.filterwarnings("ignore")

## üôä Get variebles from Hopsworks secret </span>


In [32]:
project = hopsworks.login(engine="python")
fs = project.get_feature_store() 
secrets = hopsworks.get_secrets_api()

#Get the dictionary with all batch locations
bath_locations_json = secrets.get_secret("BATH_LOCATIONS_JSON").value
bath_locations = json.loads(bath_locations_json)
print(bath_locations)

#Set date to today
today = datetime.date.today()

2025-12-21 20:49:00,342 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-12-21 20:49:00,348 INFO: Initializing external client
2025-12-21 20:49:00,348 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-12-21 20:49:01,840 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286295
[{'bath_location': 'Bergabadet', 'latitude': 59.057008, 'longitude': 17.440774}, {'bath_location': 'Br√§nningestrand', 'latitude': 59.148617, 'longitude': 17.6674}, {'bath_location': 'Eklundsn√§sbadet', 'latitude': 59.16883, 'longitude': 17.59184}, {'bath_location': 'Farstan√§sbadet', 'latitude': 59.096884, 'longitude': 17.65387}, {'bath_location': 'M√§larbadet', 'latitude': 59.222657, 'longitude': 17.611886}, {'bath_location': 'Nya Malmsj√∂badet', 'latitude': 59.234823, 'longitude': 17.536534}, {'bath_location': 'N√§sets udde(Glashyttan)', 'latitude': 59.158419, 'longitude': 17.66072}, {'bath_location': 'Under√•sbadet', 'latitude': 59.26482, 'longitude': 17.536534}, {'bath_location': '√Öbyn√§sbadet', 'latitude': 59.018397, 'longitude': 17.619576}]


## ‚òÇÔ∏è Get feature groups

In [33]:
#Select version of feature group
wt_version = 1
w_version = 1

#Retrieve feature groups
water_temperature_fg = fs.get_feature_group( 
    name='water_temperature',
    version=wt_version,
)

weather_fg = fs.get_feature_group(
    name='weather',
    version=w_version,
)

---

## üí¶ Get today's water temperature data


In [34]:
import requests
import pandas as pd

wt_today_df = util.get_wt()

wt_today_df

Saved: 2 rows to watertemp_midday.csv


Unnamed: 0,temp_water,formatted_time,alias,latitude,longitude
0,6.1,2025-12-19 13:36:00,M√§larbadet,59.222657,17.611886


In [35]:
wt_today_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   temp_water      1 non-null      float64       
 1   formatted_time  1 non-null      datetime64[ns]
 2   alias           1 non-null      object        
 3   latitude        1 non-null      float64       
 4   longitude       1 non-null      float64       
dtypes: datetime64[ns](1), float64(3), object(1)
memory usage: 172.0+ bytes


## üîô Add lagged water temperature

In [36]:
#Load dataframe and covert dates to not include time
features_df = water_temperature_fg.read()
features_df["formatted_time"] = pd.to_datetime(features_df["formatted_time"])#.dt.date
features_df

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.21s) 


Unnamed: 0,formatted_time,alias,temp_water,longitude,latitude,lagged_wt_1_day,lagged_wt_2_days,lagged_wt_3_days
0,2025-02-27 13:53:00+00:00,√Öbyn√§sbadet,2.0,17.619576,59.018397,1.9,1.7,1.7
1,2022-10-26 10:40:00+00:00,Eklundsn√§sbadet,8.3,17.591840,59.168830,11.3,8.4,5.3
2,2022-07-22 11:44:00+00:00,Br√§nningestrand,20.8,17.667400,59.148617,20.4,18.9,18.8
3,2022-12-11 10:11:00+00:00,Farstan√§sbadet,3.9,17.653870,59.096884,4.0,5.0,4.4
4,2024-06-30 14:45:00+00:00,Farstan√§sbadet,20.2,17.653870,59.096884,21.4,21.4,21.0
...,...,...,...,...,...,...,...,...
7407,2023-02-13 09:49:00+00:00,Eklundsn√§sbadet,3.6,17.591840,59.168830,3.4,3.3,3.0
7408,2024-02-22 10:17:00+00:00,Eklundsn√§sbadet,2.5,17.591840,59.168830,2.6,2.5,2.5
7409,2025-12-19 13:36:00+00:00,M√§larbadet,6.1,17.611886,59.222657,6.1,6.2,6.2
7410,2025-12-20 13:36:00+00:00,M√§larbadet,6.0,17.611886,59.222657,6.1,6.1,6.2


In [37]:
yesterday = today - datetime.timedelta(days=1)
two_days_ago = today - datetime.timedelta(days=2)
three_days_ago = today - datetime.timedelta(days=3)
print("yesterday", yesterday)
print("two_days_ago", two_days_ago)
print("three_days_ago", three_days_ago)

yesterday 2025-12-20
two_days_ago 2025-12-19
three_days_ago 2025-12-18


In [38]:
wt_today_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   temp_water      1 non-null      float64       
 1   formatted_time  1 non-null      datetime64[ns]
 2   alias           1 non-null      object        
 3   latitude        1 non-null      float64       
 4   longitude       1 non-null      float64       
dtypes: datetime64[ns](1), float64(3), object(1)
memory usage: 172.0+ bytes


In [39]:


# One row per (street, date) for each lag
yesterday_df = (features_df[features_df["formatted_time"].dt.date == yesterday][["alias", "temp_water"]].rename(columns={"temp_water": "lagged_wt_1_day"}))

two_days_ago_df = (features_df[features_df["formatted_time"].dt.date == two_days_ago][["alias", "temp_water"]].rename(columns={"temp_water": "lagged_wt_2_days"}))

three_days_ago_df = (features_df[features_df["formatted_time"].dt.date == three_days_ago][["alias", "temp_water"]].rename(columns={"temp_water": "lagged_wt_3_days"}))

# Merge lagged values into today's rows, per street/sensor
wt_today_df = (wt_today_df.merge(yesterday_df, on="alias", how="left").merge(two_days_ago_df, on="alias", how="left").merge(three_days_ago_df, on="alias", how="left"))


wt_today_df

Unnamed: 0,temp_water,formatted_time,alias,latitude,longitude,lagged_wt_1_day,lagged_wt_2_days,lagged_wt_3_days
0,6.1,2025-12-19 13:36:00,M√§larbadet,59.222657,17.611886,6.0,6.1,6.1
1,6.1,2025-12-19 13:36:00,M√§larbadet,59.222657,17.611886,6.0,6.1,6.1


## üå¶ Get weather data

In [None]:
daily_df = None



for index, row in wt_today_df.iterrows():

    current_datetime = row["formatted_time"]
    #rounded_datetime = row["rounded"].dt.round("H")

    lower_bound = (current_datetime.round("H") - pd.Timedelta(minutes=1)).time()
    upper_bound = (current_datetime.round("H") + pd.Timedelta(minutes=1)).time()



    hourly_df = util.get_hourly_weather_forecast(row["latitude"], row["longitude"])



    hourly_df = hourly_df.set_index('date')
    hourly_df = hourly_df.sort_values(by="date")

    # We will only make 1 daily prediction, so we will replace the hourly forecasts with a single daily forecast


    daily_df_partial = hourly_df.iloc[0:23].between_time(lower_bound, upper_bound)
    
    daily_df_partial = pd.concat([daily_df_partial, hourly_df.iloc[24:].between_time("11:59", "12:01")])
    print(daily_df_partial.info())

    daily_df_partial = daily_df_partial.reset_index()
    #daily_df_partial['formatted_time'] = pd.to_datetime(daily_df_partial['formatted_time']).dt.date
    #daily_df_partial['rounded'] = pd.to_datetime(daily_df_partial['rounded'])
    daily_df_partial['formatted_time'] = daily_df_partial["date"].copy()
    daily_df_partial.iloc[0]["formatted_time"] = current_datetime

    daily_df_partial = daily_df_partial.drop(columns="date")
    
    
    daily_df_partial["alias"] = row["alias"]
    daily_df_partial["latitude"] = row["latitude"]
    daily_df_partial["longitude"] = row["longitude"]
    #daily_df_partial = daily_df_partial.drop("rounded")

    if daily_df is None:
        daily_df = daily_df_partial
    else:
        daily_df=pd.concat([daily_df, daily_df_partial], ignore_index=True)
        
#daily_df = daily_df.drop(columns="date")
daily_df

Coordinates 59.25¬∞N 17.5¬∞E
Elevation 0.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Hourly: <openmeteo_sdk.VariablesWithTime.VariablesWithTime object at 0x17505dc60>
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 7 entries, 2025-12-21 14:00:00 to 2025-12-27 12:00:00
Data columns (total 4 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   temperature_2m      7 non-null      float32
 1   precipitation       7 non-null      float32
 2   wind_speed_10m      7 non-null      float32
 3   wind_direction_10m  7 non-null      float32
dtypes: float32(4)
memory usage: 168.0 bytes
None
Coordinates 59.25¬∞N 17.5¬∞E
Elevation 0.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Hourly: <openmeteo_sdk.VariablesWithTime.VariablesWithTime object at 0x175017af0>
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 7 entries, 2025-12-21 14:00:00 to 2025-12-27 12:00:00
Data columns (total 4 columns):
 #   Column 

AttributeError: 'DataFrame' object has no attribute 'concat'

In [None]:
if not daily_df is None:
    daily_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   temperature_2m      7 non-null      float32       
 1   precipitation       7 non-null      float32       
 2   wind_speed_10m      7 non-null      float32       
 3   wind_direction_10m  7 non-null      float32       
 4   formatted_time      7 non-null      datetime64[ns]
 5   alias               7 non-null      object        
 6   latitude            7 non-null      float64       
 7   longitude           7 non-null      float64       
dtypes: datetime64[ns](1), float32(4), float64(2), object(1)
memory usage: 468.0+ bytes


## üÜô Upload data to feature groups

In [None]:
# Insert new data
water_temperature_fg.insert(wt_today_df)

2025-12-19 14:44:30,628 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1286295/fs/1265767/fg/1840705


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: water_temperature_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286295/jobs/named/water_temperature_1_offline_fg_materialization/executions


(Job('water_temperature_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "temp_water",
           "min_value": -5,
           "max_value": 40,
           "strict_min": true
         },
         "meta": {
           "expectationId": 787475
         }
       },
       "result": {
         "observed_value": 6.1,
         "element_count": 1,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-12-19T01:44:30.000627Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_ex

In [None]:
# Insert new data
weather_fg.insert(daily_df, wait=True)

2025-12-19 14:44:44,590 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1286295/fs/1265767/fg/1869180


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 7/7 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286295/jobs/named/weather_1_offline_fg_materialization/executions
2025-12-19 14:45:02,002 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2025-12-19 14:45:05,197 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-12-19 14:47:19,193 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-12-19 14:47:28,746 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-12-19 14:47:28,951 INFO: Waiting for log aggregation to finish.
2025-12-19 14:47:44,360 INFO: Execution finished successfully.


(Job('weather_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 797723
         }
       },
       "result": {
         "observed_value": 7.386581897735596,
         "element_count": 7,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-12-19T01:44:44.000590Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "exp