In [57]:
from datetime import datetime, timedelta
import os
import pandas as pd

current_date = pd.to_datetime(datetime.utcnow())
print(f"{current_date}")

2025-05-10 10:49:00.206935


In [58]:
from datetime import datetime, timezone
from zoneinfo import ZoneInfo  # Python 3.9+

# Get current time in Eastern Time (EST/EDT)
current_date = datetime.now()
current_date = pd.to_datetime(current_date).floor('h')

print(f"{current_date}")

2025-05-10 06:00:00


In [59]:
fetch_data_to = current_date
fetch_data_from = current_date - timedelta(days=29)
fetch_data_from = pd.to_datetime(fetch_data_from).floor('h')
fetch_data_to =  pd.to_datetime(fetch_data_to).floor('h')

In [60]:
## Headers

from datetime import datetime
from typing import List, Optional, Tuple, Union
from src.config import RAW_DATA_DIR

# Pre-Req functions
def fill_missing_rides_full_range(df, hour_col, location_col, rides_col):
    """
    Fills in missing rides for all hours in the range and all unique locations.

    Parameters:
    - df: DataFrame with columns [hour_col, location_col, rides_col]
    - hour_col: Name of the column containing hourly timestamps
    - location_col: Name of the column containing location IDs
    - rides_col: Name of the column containing ride counts

    Returns:
    - DataFrame with missing hours and locations filled in with 0 rides
    """
    # Ensure the hour column is in datetime format
    df[hour_col] = pd.to_datetime(df[hour_col])

    # Get the full range of hours (from min to max) with hourly frequency
    full_hours = pd.date_range(
        start=df[hour_col].min(),
        end=df[hour_col].max(),
        freq="h"
    )

    # Get all unique location IDs
    all_locations = df[location_col].unique()

    # Create a DataFrame with all combinations of hours and locations
    full_combinations = pd.DataFrame(
        [(hour, location) for hour in full_hours for location in all_locations],
        columns=[hour_col, location_col]
    )

    # Merge the original DataFrame with the full combinations DataFrame
    merged_df = pd.merge(full_combinations, df, on=[hour_col, location_col], how='left')

    # Fill missing rides with 0
    merged_df[rides_col] = merged_df[rides_col].fillna(0).astype(int)

    return merged_df
# -----------------------------Data Cleaning Start-----------------------------

def load_and_process_citi_data(years: list,  months: Optional[List[int]] = None) -> pd.DataFrame:
    if months is None:
        months = list(range(1, 13))
    # List to store DataFrames for each month
    monthly_rides = []

    for year in years:
        for month in months:
            # Construct the file path
            file_path = RAW_DATA_DIR / f"JC-{year}{month:02}-citibike-tripdata.csv"

            # Load the data
            print(f"Loading data for {year}-{month:02}.")
            try:
                rides = pd.read_csv(file_path)
            # Append the processed DataFrame to the list
                monthly_rides.append(rides)
            except FileNotFoundError:
                continue

        # Combine all monthly data
        if not monthly_rides:
            raise Exception(
                f"No data could be loaded for the year {year} and specified months: {months}"
            )

        print("Combining all monthly data...")
        combined_rides = pd.concat(monthly_rides, ignore_index=True)
        print("Data loading and processing complete!")

        columns_to_drop = ['ride_id', 'end_station_name', 'rideable_type', 'ended_at', 'end_station_id','start_lat', 'start_lng', 'end_lat', 'end_lng', 'member_casual']  # Specify the columns to drop
        processed_rides = combined_rides.drop(columns=columns_to_drop)

        processed_rides.rename(columns={"started_at": "pickup_hour", "start_station_name": "station_name", "start_station_id": "station_id"}, inplace=True)        

    return combined_rides, processed_rides


def return_data_for_model(df):
    # Filter data for the desired data locations
    station_ids = ['HB101', 'HB105', 'HB305']  # Replace with your list of station_id values
    df = df[df["station_id"].isin(station_ids)]

    # convert the datatype for pickup
    df["pickup_hour"] = pd.to_datetime(df["pickup_hour"])
    df["pickup_hour"] = df["pickup_hour"].dt.floor('h') # floor is to the nearest hour

    # Group data and make it more better looking
    df = df.groupby(["pickup_hour", "station_id"]).size().reset_index()
    df.rename(columns={0: "rides"}, inplace=True)

    hour_col = "pickup_hour"
    location_col = "station_id"
    rides_col = "rides"
    interval = "6H"
    df = fill_missing_rides_full_range(df, hour_col, location_col, rides_col).sort_values(["station_id", "pickup_hour"]).reset_index(drop=True)

    particular_date_6h = datetime(2021, 7, 17)
    df = df[df["pickup_hour"] >= particular_date_6h]     

    # Set the hour column as the index
    df = df.set_index(hour_col)
#
    ## Resample and aggregate rides
    df = df.groupby("station_id").resample(interval)[rides_col].sum().reset_index()

    return df


In [61]:
from datetime import datetime, timedelta
from typing import Union
import pandas as pd

def fetch_batch_raw_data(from_date: Union[datetime, str], to_date: Union[datetime, str]) -> pd.DataFrame:
    """
    Simulate production data by sampling historical data from 52 weeks ago (i.e., 1 year).

    Args:
        from_date (datetime or str): The start date for the data batch.
        to_date (datetime or str): The end date for the data batch.

    Returns:
        pd.DataFrame: A DataFrame containing the simulated production data.
    """
    # Convert string inputs to datetime if necessary
    if isinstance(from_date, str):
        from_date = datetime.fromisoformat(from_date)
    if isinstance(to_date, str):
        to_date = datetime.fromisoformat(to_date)

    # Validate input dates
    if not isinstance(from_date, datetime) or not isinstance(to_date, datetime):
        raise ValueError("Both 'from_date' and 'to_date' must be datetime objects or valid ISO format strings.")
    if from_date >= to_date:
        raise ValueError("'from_date' must be earlier than 'to_date'.")

    # Shift dates back by 52 weeks (1 year)
    historical_from_date = from_date - timedelta(weeks=52)
    historical_to_date = to_date - timedelta(weeks=52)

    # Load and filter data for the historical period
    year = [historical_from_date.year]
    a , rides_from = load_and_process_citi_data(year, months=[historical_from_date.month])
    rides_from['pickup_hour'] = pd.to_datetime(rides_from['pickup_hour'])
    historical_from_date = pd.to_datetime(historical_from_date)
    rides_from = rides_from[rides_from.pickup_hour >= historical_from_date]

    if historical_to_date.month != historical_from_date.month:
        a , rides_to = load_and_process_citi_data(year, months=[historical_to_date.month])
        rides_to['pickup_hour'] = pd.to_datetime(rides_to['pickup_hour'])
        historical_to_date = pd.to_datetime(historical_to_date)
        rides_to = rides_to[rides_to.pickup_hour < historical_to_date]
        # Combine the filtered data
        rides = pd.concat([rides_from, rides_to], ignore_index=True)
    else:
        rides = rides_from
    # Shift the data forward by 52 weeks to simulate recent data
    rides['pickup_hour'] += timedelta(weeks=52)

    # Sort the data for consistency
    rides.sort_values(by=['station_id', 'pickup_hour'], inplace=True)

    return rides

In [62]:
rides = fetch_batch_raw_data(fetch_data_from, fetch_data_to)

Loading data for 2024-04.
Combining all monthly data...
Data loading and processing complete!
Loading data for 2024-05.
Combining all monthly data...
Data loading and processing complete!


In [63]:
rides

Unnamed: 0,pickup_hour,station_name,station_id
5594,2025-04-15 14:40:42,Hope St & Union Ave,5187.03
5035,2025-04-15 14:27:56,Devoe St & Morgan Ave,5282.02
48606,2025-04-16 09:23:25,E 11 St & 1 Ave,5746.14
4877,2025-04-19 18:15:55,Broadway & E 19 St,6098.12
2470,2025-04-29 07:53:14,E 33 St & 5 Ave,6322.01
...,...,...,...
54677,2025-04-26 10:52:51,,
54682,2025-04-26 15:45:23,,
53814,2025-04-26 19:16:49,,
54181,2025-04-27 15:01:41,,


In [64]:
ts_data = return_data_for_model(rides)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [65]:
ts_data

Unnamed: 0,station_id,pickup_hour,rides
0,HB101,2025-04-11 06:00:00,9
1,HB101,2025-04-11 12:00:00,32
2,HB101,2025-04-11 18:00:00,35
3,HB101,2025-04-12 00:00:00,2
4,HB101,2025-04-12 06:00:00,2
...,...,...,...
343,HB305,2025-05-09 00:00:00,1
344,HB305,2025-05-09 06:00:00,11
345,HB305,2025-05-09 12:00:00,28
346,HB305,2025-05-09 18:00:00,3


In [66]:
api_key = os.getenv('HOPSWORKS_API_KEY')  
project_name = os.getenv('HOPSWORKS_PROJECT_NAME')  
grp_name = os.getenv('FEATURE_GROUP_NAME')
grp_ver = os.getenv('FEATURE_GROUP_VERSION')

FEATURE_VIEW_NAME = os.getenv('FEATURE_VIEW_NAME')
FEATURE_VIEW_VERSION = os.getenv('FEATURE_VIEW_VERSION')

In [69]:
import hopsworks

project = hopsworks.login(  
    api_key_value=api_key,  
    project=project_name  
)  

feature_store = project.get_feature_store()
feature_group = feature_store.get_feature_group(
    name=grp_name,
    version=grp_ver
)

2025-05-10 06:51:52,189 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-10 06:51:52,193 INFO: Initializing external client
2025-05-10 06:51:52,194 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-10 06:51:52,792 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1225934


In [70]:
# Create a feature view if it doesn't already exist
try:
    feature_store.create_feature_view(
        name=FEATURE_VIEW_NAME,
        version=FEATURE_VIEW_VERSION,
        query=feature_group.select_all(),
    )
    print(f"Feature view '{FEATURE_VIEW_NAME}' (version {FEATURE_VIEW_VERSION}) created successfully.")
except Exception as e:
    print(f"Error creating feature view: {e}")

# Retrieve the feature view
try:
    feature_view = feature_store.get_feature_view(
        name = FEATURE_VIEW_NAME,
        version = FEATURE_VIEW_VERSION,
    )
    print(f"Feature view '{FEATURE_VIEW_NAME}' (version {FEATURE_VIEW_VERSION}) retrieved successfully.")
except Exception as e:
    print(f"Error retrieving feature view: {e}")

Error creating feature view: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/1225934/featurestores/1212531/featureview). Server response: 
HTTP code: 400, HTTP reason: Bad Request, body: b'{"errorCode":270179,"usrMsg":"Feature view: time_series_6_hour_feature_view, version: 1","errorMsg":"The provided feature view name and version already exists"}', error code: 270179, error msg: The provided feature view name and version already exists, user msg: Feature view: time_series_6_hour_feature_view, version: 1
Feature view 'time_series_6_hour_feature_view' (version 1) retrieved successfully.


In [71]:
ts_data, _ = feature_view.training_data(
    description="Time Series data for Bike Share"
)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (4.28s) 




In [72]:
ts_data = ts_data.sort_values(["station_id", "pickup_hour"]).reset_index(drop=True)

In [73]:
ts_data

Unnamed: 0,pickup_hour,station_id,rides
0,2023-12-19 19:00:00+00:00,3132.09,1
1,2023-12-04 07:00:00+00:00,3263.01,1
2,2023-09-20 22:00:00+00:00,3710.07,1
3,2023-12-01 10:00:00+00:00,3814.01,1
4,2023-12-02 21:00:00+00:00,3914.02,1
...,...,...,...
1217215,2021-08-26 20:00:00+00:00,JCSYS,2
1217216,2022-05-13 09:00:00+00:00,JCSYS,1
1217217,2022-06-08 10:00:00+00:00,JCSYS,1
1217218,2022-10-03 13:00:00+00:00,MTL-ECO5-LAB,1
