# Feature pipeline (daily)

explanation here

### Load imports

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import hopsworks
from datetime import datetime, timedelta, date
from entsoe import EntsoePandasClient
import time

#### Helper functions (timestamp)

In [48]:
# # functions for replacing date and time with timestamp (seconds since 1970-01-01)

# def entsoe_timestamp_2_time(x):
#     dt_obj = datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S')
#     dt_obj = dt_obj.timestamp() * 1000
#     return int(dt_obj)

# def weather_timestamp_2_time(x, i):
#     dt_obj = datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S')
#     dt_obj = dt_obj + timedelta(hours=i)
#     dt_obj = dt_obj.timestamp() * 1000

#     return int(dt_obj)

## Fetch & Parse data

In [3]:
## Get current date and time for prediction, prediction and actual data is available 

#used to retrieve earlier dates
date_from = "20230103"

# date_from = datetime.now() - timedelta(days=1)
# date_from = date_from.date().strftime('%Y%m%d')
date_to = (datetime.strptime(date_from, '%Y%m%d') + timedelta(days=7)).strftime('%Y%m%d')

# time
# time = datetime.now().time().strftime('%H')
date_from, date_to

('20230103', '20230110')

### Entsoe API

In [4]:
# Client
client = EntsoePandasClient(api_key="cb3a29b2-3276-4a4c-aba3-6507120d99be")

# Date and country
start = pd.Timestamp(date_from, tz='Europe/Stockholm')
end = pd.Timestamp(date_to, tz='Europe/Stockholm')
country_code = 'SE_3'  

In [9]:
## Query entsoe

# Day price
df_day_price = client.query_day_ahead_prices(country_code, start=start,end=end)

# Generation per production type
df_generation_per_prod = client.query_generation(country_code, start=start,end=end, psr_type=None)

# Actual load (consumption)
df_load = client.query_load(country_code, start=start,end=end)

In [10]:
df_generation_per_prod.head()

Unnamed: 0,Fossil Gas,Hydro Water Reservoir,Nuclear,Other,Solar,Wind Onshore
2023-01-03 00:00:00+01:00,0.0,689.0,5798.0,832.0,0.0,814.0
2023-01-03 01:00:00+01:00,0.0,671.0,5797.0,825.0,0.0,839.0
2023-01-03 02:00:00+01:00,0.0,672.0,5797.0,837.0,0.0,893.0
2023-01-03 03:00:00+01:00,0.0,666.0,5797.0,845.0,0.0,926.0
2023-01-03 04:00:00+01:00,0.0,662.0,5798.0,878.0,0.0,935.0


In [32]:
# Combine entsoe data
df_entsoe = df_generation_per_prod.join(df_day_price.rename("day_ahead_price"))
df_entsoe = df_entsoe.join(df_load)


In [33]:
# convert current index (date) into column, rename and convert into timestamp (as int64)
df_entsoe_clean = df_entsoe.reset_index()
df_entsoe_clean = df_entsoe_clean.rename(columns = {'index':'DateTime'})
df_entsoe_clean['DateTime'] = df_entsoe_clean.DateTime.values.astype('int64') // 10 ** 6  ## divide by 10^6 to convert from ns to ms


In [34]:
df_entsoe_clean # gmt + 1

Unnamed: 0,DateTime,Fossil Gas,Hydro Water Reservoir,Nuclear,Other,Solar,Wind Onshore,day_ahead_price,Actual Load
0,1672700400000,0.0,689.0,5798.0,832.0,0.0,814.0,78.81,10026.0
1,1672704000000,0.0,671.0,5797.0,825.0,0.0,839.0,73.93,9951.0
2,1672707600000,0.0,672.0,5797.0,837.0,0.0,893.0,73.94,9940.0
3,1672711200000,0.0,666.0,5797.0,845.0,0.0,926.0,71.44,9933.0
4,1672714800000,0.0,662.0,5798.0,878.0,0.0,935.0,72.33,9898.0
...,...,...,...,...,...,...,...,...,...
163,1673287200000,0.0,1509.0,5793.0,864.0,0.0,709.0,157.70,11754.0
164,1673290800000,0.0,1473.0,5795.0,876.0,0.0,626.0,145.07,11381.0
165,1673294400000,0.0,1422.0,5795.0,814.0,0.0,550.0,134.62,10835.0
166,1673298000000,0.0,1126.0,5796.0,784.0,0.0,512.0,83.84,10257.0


### SMHI

In [35]:
import json
from urllib.request import urlopen
from pandas import json_normalize

In [36]:
## fetch data
url = "https://opendata-download-metobs.smhi.se/api/version/latest/parameter/1/station/71420/period/latest-months/data.json"
response = urlopen(url)

# convert response to json, to dataframe
data_json = json.loads(response.read())
df_smhi_data = json_normalize(data_json['value']) 

# get timestamps the specified day (or latest)
timeseries_from = df_entsoe_clean["DateTime"].iloc[0]
timeseries_to = df_entsoe_clean["DateTime"].iloc[-1]

# #extract only the temperature in the time stamp interval
df_smhi_data = df_smhi_data.loc[(df_smhi_data['date'] >= timeseries_from) & (df_smhi_data['date'] <= timeseries_to)]
df_smhi_data = df_smhi_data.reset_index().rename(columns = {'date':'DateTime'})


In [37]:
df_smhi_data.head()

Unnamed: 0,index,DateTime,value,quality
0,2852,1672700400000,3.6,G
1,2853,1672704000000,1.9,G
2,2854,1672707600000,0.8,G
3,2855,1672711200000,0.3,G
4,2856,1672714800000,0.6,G


## Combine & clean final data

In [38]:
# combine Entsoe and SMHI data
df_feature_data = df_entsoe_clean.merge(df_smhi_data, how='inner', on='DateTime')

# create column total_generation, the sum of all production types                       
col_list = ["Hydro Water Reservoir", "Nuclear", "Other", "Solar", "Wind Onshore"]
df_feature_data['total_generation'] = df_feature_data[list(col_list)].sum(axis=1)

# drop redundant/irrelevant columns
df_feature_data.drop(col_list + ["Fossil Gas", "index", "quality"], axis=1, inplace=True)

# Convert into float type
df_feature_data["value"] = df_feature_data["value"].astype(float)

# rename to matching columns names
df_feature_data.rename(columns={"Actual Load": "total_load", "value": "temperature", "DateTime":"datetime"}, inplace=True)

df_feature_data.head()

Unnamed: 0,datetime,day_ahead_price,total_load,temperature,total_generation
0,1672700400000,78.81,10026.0,3.6,8133.0
1,1672704000000,73.93,9951.0,1.9,8132.0
2,1672707600000,73.94,9940.0,0.8,8199.0
3,1672711200000,71.44,9933.0,0.3,8234.0
4,1672714800000,72.33,9898.0,0.6,8273.0


## Add to feature group

In [39]:
import hopsworks

project = hopsworks.login() 
fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/4247




Connected. Call `.close()` to terminate connection gracefully.


In [88]:
new_electricity_data_fg = fs.get_or_create_feature_group(name = 'new_electricity_data_fg', version = 2)

In [89]:
new_electricity_data_fg.insert(df_feature_data)

Uploading Dataframe: 100.00% |██████████| Rows 24/24 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/4247/jobs/named/new_electricity_data_fg_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7fad00bc7670>, None)

### Modal script for future daily features
For retrieving daily data through scheduled scripts, Modal is used in which the following function is uploaded and sheduled to run on hourly basis

In [None]:
import os
import modal
    
LOCAL=False

if LOCAL == False:
   stub = modal.Stub()
   image = modal.Image.debian_slim().pip_install(["hopsworks==3.0.4","joblib","seaborn","sklearn","dataframe-image"]) 

   @stub.function(image=image, schedule=modal.Period(days=1), secret=modal.Secret.from_name("abyel-hopsworks-secret"))
   def f():
       g()

def get_entsoe_data():
    # Day price
    df_day_price = client.query_day_ahead_prices(country_code, start=start,end=end)
    df_generation_per_prod = client.query_generation(country_code, start=start,end=end, psr_type=None)
    df_load = client.query_load(country_code, start=start,end=end)
    
    df_entsoe = df_generation_per_prod.join(df_day_price.rename("day_ahead_price"))
    df_entsoe = df_entsoe.join(df_load)
    df_entsoe_clean = df_entsoe.reset_index()
    df_entsoe_clean = df_entsoe_clean.rename(columns = {'index':'DateTime'})
    df_entsoe_clean['DateTime'] = df_entsoe_clean.DateTime.values.astype('int64') // 10 ** 6  ## divide by 10^6 to convert from ns to ms

def get_prediction_data():
    """
    Fetches the recent rediction and actual data for electricity price and weather
    """
    import pandas as pd
    import random

    # random_pclass = random.randint(1, 3)
    # random_sex = random.randint(0, 1)
    # random_age = random.randint(0, 5) # [0,1,2,3,4,5]
    # random_embarked = random.randint(0, 2)

    # passenger_df = pd.DataFrame({ "passengerid": [passenger_id],
    #                             "pclass": [random_pclass],
    #                             "sex": [random_sex],
    #                             "age": [random_age],
    #                             "embarked": [random_embarked],
    #                   })
    # passenger_df['survived'] = survived
    
    return passenger_df


def g():
    import hopsworks
    import pandas as pd

    project = hopsworks.login()
    fs = project.get_feature_store()

    electricity_data_fg = fs.get_feature_group(name="electricity_data_updated_fg", version=1)    

    # date to fetch data


    new_electricity_df = get_prediction_data(passenger_id)

    print(electricity_data_fg.head(5))
   
    
    electricity_data_fg.insert(electricity_data_fg, write_options={"wait_for_job" : False})

if __name__ == "__main__":
    if LOCAL == True :
        g()
    else:
        with stub.run():
            f()