In [1]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from functions import utils
import json
import os
import git
import warnings
warnings.filterwarnings("ignore")

In [2]:
from dotenv import load_dotenv
load_dotenv()
HOPSWORKS_API_KEY = os.getenv("HOPSWORKS_API_KEY")
os.environ["HOPSWORKS_API_KEY"] = HOPSWORKS_API_KEY

## Get the necessary data from Hopsworks

In [3]:
project = hopsworks.login()
fs = project.get_feature_store() 
# secrets = utils.secrets_api(project.name)

CITY = "dublin"
STATION = "HEUSTON BRIDGE (NORTH)"

# latitude =
# longitude =

today = datetime.datetime.now()

2025-01-10 09:00:22,646 INFO: Initializing external client


2025-01-10 09:00:22,649 INFO: Base URL: https://c.app.hopsworks.ai:443


2025-01-10 09:00:26,646 INFO: Python Engine initialized.



Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1207494


In [4]:
# Retrieve feature groups
bike_fg = fs.get_feature_group(
    name='bike_data',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather_data',
    version=1,
)


## Clone and pull the repository with the bike data

In [5]:
# Configuration
REPO_URL = "https://github.com/MaxHalford/bike-sharing-history"
CLONE_DIR = "./bike_data/bike-sharing-history"
TARGET_CITY = "dublin"
FILE_NAME = "jcdecaux.geojson"

In [6]:
if not os.path.exists(CLONE_DIR):
    print("Cloning repository...")
    git.Repo.clone_from(REPO_URL, CLONE_DIR)
repo = git.Repo(CLONE_DIR)

# go to main and pull the latest changes
repo.git.checkout("main", force=True)
repo.remotes.origin.pull()

[<git.remote.FetchInfo at 0x1eaaab589a0>,
 <git.remote.FetchInfo at 0x1eaaab345e0>]

## Get the latest datetime present in the bike data

In [7]:
# last_bike_datetime = "2025-01-06 15:06:11 UTC"

bike_df = bike_fg.read()

last_bike_datetime = bike_df["datetime"].max()
last_bike_datetime = last_bike_datetime.strftime("%Y-%m-%d %H:%M:%S %Z")

last_bike_datetime

Reading data from Hopsworks, using Hopsworks Feature Query Service.   

Reading data from Hopsworks, using Hopsworks Feature Query Service..   

Reading data from Hopsworks, using Hopsworks Feature Query Service...   

Reading data from Hopsworks, using Hopsworks Feature Query Service   

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.92s) 


'2025-01-08 23:49:15 UTC'

## Loop through the commits and convert the bike data into a dataframe

In [8]:
results = {}

# Populate the results dict with the stations
data_file = os.path.join(CLONE_DIR, "data/stations/" + TARGET_CITY + "/" + FILE_NAME)
if os.path.exists(data_file):
    with open(data_file, "r") as f:
        data = f.read()
        data = json.loads(data)
        for feature in data["features"]:
            name = feature["properties"]["name"]
            results[name] = []

In [9]:
start_time = datetime.datetime.now()
last_day_and_hour = None

for commit in repo.iter_commits():
    # Stop when we reach the earliest bike date
    if commit.committed_datetime <= datetime.datetime.strptime(last_bike_datetime, "%Y-%m-%d %H:%M:%S %Z").replace(tzinfo=datetime.timezone.utc):
        print("breaking at: ", commit.committed_datetime)
        break

    # Skip commits from today
    if commit.committed_datetime > today.replace(tzinfo=datetime.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0):
        first_commited_datetime = repo.commit().committed_datetime
        continue

    day_and_hour = commit.committed_datetime.replace(minute=0, second=0, microsecond=0)
    # print("day_and_hour: ", day_and_hour, " - last_day_and_hour: ", last_day_and_hour)
    if day_and_hour == last_day_and_hour:
        continue
    last_day_and_hour = day_and_hour

    # Get the data for the commit
    print("Processing commit: ", commit.hexsha, " - ", commit.committed_datetime)
    try:
        repo.git.checkout(commit.hexsha, force=True)
    except Exception as e:
        print("Error checking out commit: ", e)
        break

    data_file = os.path.join(CLONE_DIR, "data/stations/" + TARGET_CITY + "/" + FILE_NAME)
    if os.path.exists(data_file):
        with open(data_file, "r") as f:
            data = f.read()
            data = json.loads(data)
            for feature in data["features"]:
                try:
                    results[feature["properties"]["name"]].append([feature["properties"]["available_bikes"], commit.committed_datetime])
                except KeyError:
                    continue

print(results)


Processing commit:  c1f70b2f6250d3733968bba343d728ce9e1dbfdc  -  2025-01-09 23:49:15+00:00


Processing commit:  2052fe5e4ac18ce2cc335ff35f832e5b9e65acd1  -  2025-01-09 22:49:06+00:00


Processing commit:  de5a3381241aab9a7e3f7afc332e3e2f1039e48d  -  2025-01-09 21:49:07+00:00


Processing commit:  45048800c49f5ed76fcbb15b4d65c9d8cc491d0c  -  2025-01-09 20:48:58+00:00


Processing commit:  44d730ac1545e77ac7e488db5151db574f1fb73c  -  2025-01-09 19:49:05+00:00


Processing commit:  c71eba4008b8aaf81bdd219b4b14ff413c96bbec  -  2025-01-09 18:50:05+00:00


Processing commit:  fa9b52af8efe213d98c1b3ea851a4b9723a7e9b3  -  2025-01-09 17:49:03+00:00


Processing commit:  f59f91e66575f67a97d3c1af9bf5ed595e9fe373  -  2025-01-09 16:50:14+00:00


Processing commit:  99a5fc5adb84b1f0a632373bd336835ff05bc27d  -  2025-01-09 15:48:59+00:00


Processing commit:  f78e6cffb8967e5f1a827a4c4cbc89b8bd24efb7  -  2025-01-09 14:49:45+00:00


Processing commit:  7101febd5ebecebf4c515dd030f1e040bdf33c55  -  2025-01-09 13:49:13+00:00


Processing commit:  379d04c4ae740452ece08c8c73a6d1257ebc8279  -  2025-01-09 12:46:51+00:00


Processing commit:  321fa9dec448b03ca8676b956003298dc073f67c  -  2025-01-09 11:49:24+00:00


Processing commit:  842cca0dee7484f87c8bf45885a11b2b271f5070  -  2025-01-09 10:48:59+00:00


Processing commit:  7d896acfad7a0a9a5bc5e322443ef7fb34577df7  -  2025-01-09 09:49:05+00:00


Processing commit:  58de5919e6a2402f9d04c8b5f5b3833c1951afeb  -  2025-01-09 08:49:51+00:00


Processing commit:  9f0b8a03deae8c84ab3350c9acb13a864a842db6  -  2025-01-09 07:35:07+00:00


Processing commit:  3c14d6c7ebf7c10380014a85ba2e8263e836ba89  -  2025-01-09 06:39:54+00:00


Processing commit:  e569c43feed7b61d84a20d7902e267e231779719  -  2025-01-09 04:49:17+00:00


Processing commit:  a64148660006c94dd4400110b850b00cc724f5f8  -  2025-01-09 03:56:10+00:00


Processing commit:  f64b25d2b9ea0ed94b962e18e8213c0949a8ae55  -  2025-01-09 02:30:22+00:00


Processing commit:  3b98c8b58810240bf3ffde18c68d623af0c19647  -  2025-01-09 00:49:56+00:00


breaking at:  2025-01-08 23:49:15+00:00
{'CLARENDON ROW': [[0, datetime.datetime(2025, 1, 9, 23, 49, 15, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAACC1720>)], [0, datetime.datetime(2025, 1, 9, 22, 49, 6, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAACC24A0>)], [0, datetime.datetime(2025, 1, 9, 21, 49, 7, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAACC1810>)], [1, datetime.datetime(2025, 1, 9, 20, 48, 58, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAACC1780>)], [1, datetime.datetime(2025, 1, 9, 19, 49, 5, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAACD6680>)], [8, datetime.datetime(2025, 1, 9, 18, 50, 5, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAACDFC40>)], [12, datetime.datetime(2025, 1, 9, 17, 49, 3, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAAE2BEE0>)], [15, datetime.datetime(2025, 1, 9, 16, 50, 14, tzinfo=<git.objects.util.tzoffset object at 0x000001EAAAE22800>)], [17, datetime.datetime(2025, 1, 9, 15, 4

In [10]:
# turn results into a dataframe
df_bike_today = pd.DataFrame()

for station, values in results.items():
    if len(values) > 0:
        df = pd.DataFrame(values, columns=["num_bikes_available", "datetime"])
        df["datetime"] = pd.to_datetime(df["datetime"], utc=True)
        df["station"] = station.replace(" ", "_")
        df_bike_today = pd.concat([df_bike_today, df])

# if empty, do nothing
if not df_bike_today.empty:
    df_bike_today.dropna(inplace=True)
    df_bike_today["num_bikes_available"] = df_bike_today["num_bikes_available"].astype("float32")
    df_bike_today = df_bike_today[df_bike_today['station'].isin([STATION.replace(" ", "_")])]
    df_bike_today


## Fetch the weather data for the same time period

In [11]:
forecast_df = utils.get_hourly_weather_forecast(CITY)
forecast_df = forecast_df.rename(columns={'date_x': 'datetime'})
forecast_df = forecast_df.drop(columns=['date_y', 'date_only'])
forecast_df.dropna(inplace=True)

print(forecast_df.empty)

forecast_df

features: {'hourly': ['temperature_2m', 'apparent_temperature', 'rain', 'snowfall', 'wind_speed_10m'], 'daily': ['daylight_duration', 'rain_sum']}
params: {'latitude': 53.35, 'longitude': -6.26, 'hourly': ['temperature_2m', 'apparent_temperature', 'rain', 'snowfall', 'wind_speed_10m'], 'daily': ['daylight_duration', 'rain_sum']}
Coordinates 53.5°N -6.25°E
Elevation 11.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


False


Unnamed: 0,datetime,temperature_2m,apparent_temperature,rain,snowfall,wind_speed_10m,daylight_duration,rain_sum,city
0,2025-01-10 00:00:00+00:00,-2.35,-5.730032,0.0,0.0,5.804825,28276.632812,0.0,dublin
1,2025-01-10 01:00:00+00:00,-2.05,-5.314920,0.0,0.0,5.154416,28276.632812,0.0,dublin
2,2025-01-10 02:00:00+00:00,-1.50,-4.547288,0.0,0.0,3.893995,28276.632812,0.0,dublin
3,2025-01-10 03:00:00+00:00,-0.80,-3.824788,0.0,0.0,4.104631,28276.632812,0.0,dublin
4,2025-01-10 04:00:00+00:00,0.10,-3.213235,0.0,0.0,6.489992,28276.632812,0.0,dublin
...,...,...,...,...,...,...,...,...,...
235,2025-01-19 19:00:00+00:00,7.50,2.203819,0.0,0.0,25.704100,29701.466797,3.6,dublin
236,2025-01-19 20:00:00+00:00,7.30,2.118057,0.0,0.0,24.912935,29701.466797,3.6,dublin
237,2025-01-19 21:00:00+00:00,7.20,2.150184,0.0,0.0,24.122686,29701.466797,3.6,dublin
238,2025-01-19 22:00:00+00:00,7.15,2.323161,0.0,0.0,22.771244,29701.466797,3.6,dublin


## Insert the bike and weather data into Hopsworks

In [12]:
if df_bike_today.empty:
    print("No bike data available for today")
else:
    bike_fg.insert(df_bike_today)

Uploading Dataframe: 0.00% |                                      | Rows 0/22 | Elapsed Time: 00:00 | Remaining Time: ?

Uploading Dataframe: 50.00% |████████████████                | Rows 11/22 | Elapsed Time: 00:01 | Remaining Time: 00:01

Uploading Dataframe: 100.00% |███████████████████████████████| Rows 22/22 | Elapsed Time: 00:01 | Remaining Time: 00:00




Launching job: bike_data_1_offline_fg_materialization


Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1207494/jobs/named/bike_data_1_offline_fg_materialization/executions


In [13]:
if forecast_df.empty:
    print("No weather forecast available for today")
else:
    weather_fg.insert(forecast_df, write_options={"wait_for_job": True})


Uploading Dataframe: 0.00% |                                     | Rows 0/240 | Elapsed Time: 00:00 | Remaining Time: ?

Uploading Dataframe: 0.42% |▏                                | Rows 1/240 | Elapsed Time: 00:01 | Remaining Time: 04:14

Uploading Dataframe: 100.00% |█████████████████████████████| Rows 240/240 | Elapsed Time: 00:01 | Remaining Time: 00:00




Launching job: weather_data_1_offline_fg_materialization


Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1207494/jobs/named/weather_data_1_offline_fg_materialization/executions


2025-01-10 09:01:25,185 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED


2025-01-10 09:01:28,362 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED


2025-01-10 09:03:23,100 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED


2025-01-10 09:03:23,266 INFO: Waiting for log aggregation to finish.


2025-01-10 09:03:31,853 INFO: Execution finished successfully.
