In [25]:
import hopsworks
import pandas as pd
import requests

from math import pi, cos, asin, sqrt

In [21]:
CONST_EARTH_RADIUS = 6371
CONST_EARTH_DIAMETER = 12742
STATIONS_WITHIN_DISTANCE = 50

In [2]:
project = hopsworks.login(
    host="2a5f8040-2d0d-11ed-b5c5-c151c2fe58c1.cloud.hopsworks.ai",
    project="electricity",
    api_key_value="xIGaDiwBZpXxLYRQ.SgheZwqHTZteASnO1PtPMC4HMgco2OVD6VK6xZN3sKCy96FQT27rF0JbBuFivTTA",
)
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://2a5f8040-2d0d-11ed-b5c5-c151c2fe58c1.cloud.hopsworks.ai:443/p/7287
Connected. Call `.close()` to terminate connection gracefully.




In [28]:
def distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Calculate distance in km between two geographical points.
    Args:
        lat1 (float): Latitude of point 1.
        lon1 (float): Longitude of point 1.
        lat2 (float): Latitude of point 2.
        lon2 (float): Longitude of point 2.
    Returns:
        float: Haversine distance in km between point 1 and 2.
    """
    p = pi / 180.0
    a = 0.5 - cos((lat2 - lat1) * p) / 2.0 + cos(lat1 * p) * cos(lat2 * p) * (1.0 - cos((lon2 - lon1) * p)) / 2
    return CONST_EARTH_DIAMETER * asin(sqrt(a)) 

def hsmi_measurment_data(measurement, period):
    city_name = "Stockholm"

    # "Stockholm", "Malmö", "Luleå", "Sundsvall"
    city_coordinates = pd.read_csv("https://repo.hops.works/dev/davit/electricity/city_coordinates.csv")
    city_coordinates = city_coordinates[city_coordinates.City == city_name]

    #39 --> # aggpunktstemperatur --> momentanvärde, 1 gång/tim
    if measurement == "temp_per_last_hour":
        parameter = 39

    #2 -->  Lufttemperatur --> medelvärde 1 dygn, 1 gång/dygn, kl 00
    if measurement == "mean_temp_per_day":
        parameter = 2
    #18 --> Nederbörd --> 1 gång/dygn, kl 18
    elif measurement == "precipitaton_type":
        parameter = 18

    #14 --> Nederbördsmängd --> summa 15 min, 4 gånger/tim
    elif measurement == "precipitaton_amount":
        parameter = 14

    #7 --> Nederbördsmängd --> summa 1 timme, 1 gång/tim
    if measurement == "precipitaton_amount_last_hour":
        parameter = 7

    #10 --> Solskenstid --> summa 1 timme, 1 gång/tim;second
    elif measurement == "sunshine_time":
        parameter = 10

    #16 --> Total molnmängd --> momentanvärde, 1 gång/tim;percent
    elif measurement == "cloud_perc":
        parameter = 16

    #4 --> Vindhastighet  --> medelvärde 10 min, 1 gång/tim;metre per second
    elif measurement == "wind_speed":
        parameter = 4

    stations_url = f"https://opendata-download-metobs.smhi.se/api/version/latest/parameter/{parameter}.json"
    stations_resp = requests.get(url= stations_url)
    stations_pdf = pd.DataFrame(stations_resp.json()["station"])[["name","measuringStations", "id", "latitude", "longitude", "active", "key", "updated"]]
    stations_pdf = stations_pdf[stations_pdf.active == True]

    # select station in STATIONS_WITHIN_DISTANCE km radius
    stations_pdf["distance"] = stations_pdf.apply(lambda x: distance(city_coordinates.latitude.values[0], city_coordinates.longitude.values[0], x.latitude, x.longitude), axis=1)
    stations_pdf = stations_pdf[stations_pdf.distance < STATIONS_WITHIN_DISTANCE]

    if parameter in [2, 18, 5]:
        skiprows = 12
        column_names = ["from", "to", "day", measurement,"quality", "time_slice", "comment"]
    elif parameter in [10, 16, 4, 39, 14]:
        skiprows = 11
        column_names = ["day", "time", measurement,"quality", "time_slice", "comment"]

    measurment_by_city = pd.DataFrame(columns=column_names)
    for station_id in stations_pdf.id:
        if parameter in [2, 18, 5, 10, 16, 4, 39, 14]:
            url = f"https://opendata-download-metobs.smhi.se/api/version/latest/parameter/{parameter}/station/{station_id}/period/{period}/data.csv"
            try:
                if period == "corrected-archive":
                    pdf = pd.read_csv(url, sep=';', skiprows=skiprows, names= column_names)
                elif period == "latest-months":
                    pdf = pd.read_csv(url, sep=';', skiprows=skiprows, names= column_names)
                elif period == "latest-day":
                    pdf = pd.read_csv(url, sep=';', skiprows=skiprows, names= column_names)
                #pdf["area"] = area_name
                pdf = pdf[pdf["day"] > "2020-12-31"]
                measurment_by_city = pd.concat([measurment_by_city, pdf])
            except Exception:
                pass
            if parameter in [2, 18, 5]:
                measurment_by_city = measurment_by_city.drop(["from", "to"], axis=1)
            measurment_by_city = measurment_by_city.drop(["quality", "time_slice", "comment"], axis=1)
            measurment_by_city = measurment_by_city.dropna()
        return measurment_by_city
    

def fetch_smhi_measurements(historical_data = False):
    measurements = ["mean_temp_per_day", "wind_speed", "precipitaton_type", "precipitaton_amount", "sunshine_time", "cloud_perc"]
    meteorological_measurements = pd.DataFrame(columns=["day"])
    for measurement in measurements:
        meteorological_measurements_per_area = pd.DataFrame(columns=["day"])
        for area in ["SE3"]:
            smhi_df = pd.DataFrame(columns=["day", measurement])
            if historical_data:
                smhi_df = pd.concat([smhi_df, hsmi_measurment_data(measurement, "corrected-archive")])
                smhi_df = pd.concat([smhi_df, hsmi_measurment_data(measurement, "latest-months")])
            else:
                if measurement == "mean_temp_per_day":
                    smhi_df_day = hsmi_measurment_data("temp_per_last_hour", "latest-day").drop("time", axis=1)
                    smhi_df_day.columns = ["day", measurement]
                    smhi_df = pd.concat([smhi_df, smhi_df_day])
                else:
                    smhi_df = pd.concat([smhi_df, hsmi_measurment_data(measurement, "latest-day")])
            if measurement == "mean_temp_per_day":
                smhi_df = smhi_df[smhi_df[measurement] != "Lufttemperatur"]
                smhi_df = smhi_df[smhi_df[measurement] != "Daggpunktstemperatur"]
                smhi_df[measurement] = smhi_df[measurement].map(lambda x: float(x))
                smhi_df = smhi_df.groupby(["day"]).agg({measurement: ['mean']}).reset_index()
                smhi_df.columns = ["day", measurement]
            elif measurement == "wind_speed":
                smhi_df = smhi_df[smhi_df[measurement] != "Vindhastighet"]
                smhi_df[measurement] = smhi_df[measurement].map(lambda x: float(x))
                smhi_df = smhi_df.groupby(["day"]).agg({'wind_speed': ['mean']}).reset_index()
                smhi_df.columns = ["day", f"mean_{measurement}"]
            elif measurement == "precipitaton_amount":
                smhi_df = smhi_df[smhi_df[measurement] != "Nederbördsmängd"]
                smhi_df[measurement] = smhi_df[measurement].map(lambda x: float(x))
                smhi_df = smhi_df.groupby(["day"]).agg({measurement: ['mean']}).reset_index()
                smhi_df.columns = ["day", measurement]
            elif measurement == "sunshine_time":
                smhi_df = smhi_df.groupby(["day"]).agg({measurement: ["sum"]}).reset_index()
                smhi_df.columns = ["day", f"total_{measurement}"]
            elif measurement == "cloud_perc":
                smhi_df = smhi_df[smhi_df[measurement] != "Total molnmängd"]
                smhi_df[measurement] = smhi_df[measurement].map(lambda x: float(x))
                smhi_df = smhi_df.groupby(["day"]).agg({measurement: ['mean']}).reset_index()
                smhi_df.columns = ["day", f"mean_{measurement}"]
            smhi_df.columns = [smhi_df.columns[0], f"{smhi_df.columns[1]}_{area}"]
            #meteorological_measurements_per_area = pd.concat([meteorological_measurements_per_area, smhi_df])
            meteorological_measurements_per_area = meteorological_measurements_per_area.merge(smhi_df, on=["day"], how = "outer")
        meteorological_measurements = meteorological_measurements.merge(meteorological_measurements_per_area, on=["day"], how = "outer")

    for area in ["SE3"]:
        meteorological_measurements[f"precipitaton_type_{area}"] = meteorological_measurements[f"precipitaton_type_{area}"].fillna("missing")
        meteorological_measurements[f"precipitaton_amount_{area}"] = meteorological_measurements[f"precipitaton_amount_{area}"].fillna(0.0)
        meteorological_measurements[f"mean_wind_speed_{area}"] = meteorological_measurements[f"mean_wind_speed_{area}"].fillna(0.0)
        meteorological_measurements[f"total_sunshine_time_{area}"] = meteorological_measurements[f"total_sunshine_time_{area}"].fillna(0.0)
        meteorological_measurements[f"mean_cloud_perc_{area}"] = meteorological_measurements[f"mean_cloud_perc_{area}"].fillna(0.0)
        meteorological_measurements.sort_values(["day"], inplace=True)
        
    return meteorological_measurements

In [32]:
weather_df = fetch_smhi_measurements(True)



In [38]:
weather_df['zone'] = "SE3"
weather_df['day'] = pd.to_datetime(weather_df['day'], format='%Y-%m-%d')

In [40]:
se_weather_fg = fs.get_or_create_feature_group(
    name="se_weather_fg",
    version=1,
    description="Swedish weather features",
    primary_key=["zone"],
    event_time="day",
    online_enabled=False,
    statistics_config={
        'histograms': True, 
        'correlations': True
    }
)

In [41]:
se_weather_fg.insert(weather_df)



Feature Group created successfully, explore it at 
https://2a5f8040-2d0d-11ed-b5c5-c151c2fe58c1.cloud.hopsworks.ai:443/p/7287/fs/7235/fg/14359


Uploading Dataframe: 0.00% |          | Rows 0/676 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://2a5f8040-2d0d-11ed-b5c5-c151c2fe58c1.cloud.hopsworks.ai/p/7287/jobs/named/se_weather_fg_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x7f77f082d0a0>, None)