In [1]:
import pandas as pd
import numpy as np
import csv, json
import geopandas as gpd
from shapely.geometry import Polygon, Point
import requests
import holidays
from pathlib import Path

from sklearn.preprocessing import OneHotEncoder, StandardScaler

import sys
sys.path.append('../')
from bikesharing.params import *
from bikesharing.ml_logic.data import get_raw_data

In [2]:
## Pre-processing functions ##

def pre_process_rental_df(df: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocesses the rental DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: The preprocessed DataFrame.
    """
    # Select relevant columns only
    df = df[['STARTTIME', 'STARTLAT', 'STARTLON']].copy()

    # Strip column names
    df.columns = [col.strip() for col in df.columns]

    # Remove column 'Row'
    df.drop(columns='Row', inplace=True, errors='ignore')

    # Make string replacements
    df_obj = df.select_dtypes(include='object')
    df[df_obj.columns] = df_obj.applymap(lambda x: x.strip().replace(',', '.') if isinstance(x, str) else x)

    # Handle datetime and numerical datatypes
    df.STARTTIME = pd.to_datetime(df.STARTTIME)
    df[['STARTLAT', 'STARTLON']] = df[['STARTLAT', 'STARTLON']].astype(np.float32)

    return df


def load_polygons(polygons_file: str) -> dict:
    """
    Loads polygons from a file and returns them as a dictionary.

    Args:
        polygons_file (str): The path to the polygons file.

    Returns:
        dict: The dictionary of polygons.
    """
    polygons = {}
    with open(polygons_file, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            polygons[row['district']] = Polygon(json.loads(row['coordinates']))

    return polygons


def get_district(rental_df: pd.DataFrame, polygons: dict) -> pd.DataFrame:
    """
    Performs a spatial join between the rental DataFrame and polygons.

    Args:
        rental_df (pd.DataFrame): The rental DataFrame.
        polygons (dict): The dictionary of polygons.

    Returns:
        pd.DataFrame: The DataFrame with the spatial join result.
    """
    # Create a DataFrame from the polygons dictionary
    polygons_df = pd.DataFrame.from_dict(polygons, orient='index', columns=['geometry'])
    # Reset the index to make the 'district' column a regular column
    polygons_df = polygons_df.reset_index().rename(columns={'index': 'district'})

    # Create a GeoDataFrame from the polygons DataFrame
    polygons_gdf = gpd.GeoDataFrame(polygons_df)
    # Set the geometry column in the polygons_gdf GeoDataFrame
    polygons_gdf.set_geometry('geometry', inplace=True)

    # Create a GeoDataFrame from the point data
    geometry = [Point(row['STARTLON'], row['STARTLAT']) for _, row in rental_df.iterrows()]
    rental_gdf = gpd.GeoDataFrame(rental_df, geometry=geometry)
    # Set the geometry column in the rental_gdf GeoDataFrame
    rental_gdf.set_geometry('geometry', inplace=True)

    # Perform the spatial join
    rental_geo_df = gpd.sjoin(rental_gdf, polygons_gdf, predicate='within')

    # Drop unnecessary columns
    rental_geo_df = rental_geo_df.drop(columns=['geometry', 'index_right'])

    return rental_geo_df


def encode_district_label(df: pd.DataFrame) -> pd.DataFrame:
    """
    Encodes the district labels in the DataFrame using one-hot encoding.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: The DataFrame with encoded district labels.
    """
    # Instantiate the OneHotEncoder
    district_ohe = OneHotEncoder(sparse=False)

    # Fit encoder
    district_ohe.fit(df[['district']])

    # Apply one-hot encoding and add the encoded columns to the DataFrame
    encoded_columns = district_ohe.get_feature_names_out()
    encoded_values = district_ohe.transform(df[['district']])
    df_encoded = pd.DataFrame(encoded_values, columns=encoded_columns)

    # Update the column names in df without the prefix 'district_'
    column_names = [column.split('district_', 1)[-1] for column in df_encoded.columns]
    df.columns = list(df.columns[:-len(encoded_columns)]) + column_names

    return df


def group_rental_data_by_hour(df: pd.DataFrame) -> pd.DataFrame:
    """
    Groups the rental data by hour.

    Args:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        pd.DataFrame: The DataFrame with rental data grouped by hour.
    """
    # Preprocessing
    df['date'] = df['STARTTIME'].dt.date
    df['year'] = df['STARTTIME'].dt.year
    df['month'] = df['STARTTIME'].dt.month
    df['hour'] = df['STARTTIME'].dt.hour
    df['date_hour'] = df['STARTTIME'].dt.floor('H')

    # Grouping by Hour
    df_by_hour = df.groupby('date_hour').agg({
        'RENTAL_IS_STATION': np.mean,
        'year': np.mean,
        'month': np.mean,
        'hour': np.mean,
        **{district: np.sum for district in df['district']}
    }).reset_index()

    return df_by_hour


def get_weather_info(rental_df: pd.DataFrame) -> pd.DataFrame:
    """
    Retrieves weather information and merges it with the rental data.

    Args:
        rental_df (pd.DataFrame): The rental DataFrame.

    Returns:
        pd.DataFrame: The DataFrame with merged rental and weather data.
    """
    def fetch_weather_data(latitude, longitude, start_date, end_date):
        # TODO: should be moved to env
        url = "https://archive-api.open-meteo.com/v1/era5"
        params = {
            'latitude': latitude,
            'longitude': longitude,
            'start_date': start_date,
            'end_date': end_date,
            'hourly': ['temperature_2m', 'relativehumidity_2m', 'apparent_temperature', 'windspeed_10m', 'precipitation']
        }
        weather_data = requests.get(url, params=params).json()
        df_weather = pd.DataFrame(weather_data['hourly'])
        df_weather['time'] = pd.to_datetime(df_weather['time'])

        return df_weather

    # Merge rental data with weather data
    df_weather = fetch_weather_data()
    merged_data = rental_df.merge(df_weather, left_on='date_hour', right_on='time', how='left').drop(columns='time')
    merged_data['date_hour'] = pd.to_datetime(merged_data['date_hour'])

    return merged_data


def feature_extraction(data: pd.DataFrame) -> tuple:
    """
    Performs feature engineering on the input data.

    Args:
        data (pd.DataFrame): The input DataFrame.

    Returns:
        tuple: A tuple containing the features DataFrame (X) and the target DataFrame (y).
    """
    data['date_hour'] = pd.to_datetime(data['date_hour'])

    # Extract date from date_hour
    data['date'] = data['date_hour'].dt.date

    # Select features for X and y
    X = data[['date', 'date_hour', 'year', 'month', 'hour', 'temperature_2m', 'relativehumidity_2m',
              'apparent_temperature', 'windspeed_10m', 'precipitation']]
    
    X['date'] = pd.to_datetime(X['date'])
    X['is_weekend'] = X['date'].dt.weekday >= 5

    bay_holidays = holidays.CountryHoliday('DE', prov='BY')
    X['is_holiday'] = X['date'].apply(lambda x: x in bay_holidays)
    
    y = data[['date_hour','Sendling-Westpark', 'Altstadt-Lehel', 'Schwabing-West', 'Untergiesing',
       'Untergiesing-Harlaching', 'Maxvorstadt', 'Bogenhausen', 'Sendling',
       'Milbertshofen-Am Hart', 'Neuhausen-Nymphenburg', 'Moosach',
       'Obergiesing', 'Au - Haidhausen', 'Ludwigsvorstadt-Isarvorstadt',
       'Laim', 'Schwanthalerhöhe', 'Schwabing-Freimann', 'Ramersdorf-Perlach',
       'Thalkirchen', 'Aubing-Lochhausen-Langwied', 'Hadern', 'Berg am Laim',
       'Harlaching', 'Obersendling', 'Südgiesing', 'Pasing',
       'Pasing-Obermenzing', 'Hasenbergl-Lerchenau Ost', 'Obermenzing',
       'Trudering', 'Trudering-Riem', 'Feldmoching', 'Untermenzing-Allach',
       'Lochhausen']].copy()
    
    # Add sine and cosine transformations of time-related features
    time_features = ['hour', 'month', 'date_hour.dt.weekday']
    for feature in time_features:
        X[f'{feature}_sin'] = np.sin(2 * np.pi * X[feature] / X[feature].max())
        X[f'{feature}_cos'] = np.cos(2 * np.pi * X[feature] / X[feature].max())

    return X, y


def scale_numeric_features(data: pd.DataFrame, numeric_features: list) -> pd.DataFrame:
    """
    Scales the numeric features in the given DataFrame using StandardScaler.

    Args:
        data (pd.DataFrame): The input DataFrame.
        numeric_features (list): List of column names representing the numeric features to scale.

    Returns:
        pd.DataFrame: The DataFrame with scaled numeric features.
    """
    scaled_data = data.copy()

    # Scale the numeric features
    scaler = StandardScaler()
    scaled_data[numeric_features] = scaler.fit_transform(scaled_data[numeric_features])

    return scaled_data


In [19]:
# Define the SQL query to fetch rental data from BigQuery
query = f'''SELECT STARTTIME, STARTLAT, STARTLON
         FROM `{GCP_PROJECT}.{BQ_DATASET}.raw_data_mvg`
         WHERE STARTTIME >= '{START_YEAR}-01-01' AND STARTTIME <= '{END_YEAR}-12-31' 
         '''

# Fetch the rental data from BigQuery
cache_path = Path(LOCAL_DATA_PATH).joinpath("raw", f"raw_{START_YEAR}_{END_YEAR}.csv")
rentals_df = get_raw_data(GCP_PROJECT, query, cache_path)
print(rentals_df.shape)
rentals_df.head(3)

[34m
Load data from BigQuery server...[0m
columns: Index(['STARTTIME', 'STARTLAT', 'STARTLON'], dtype='object')
✅ Data loaded, with shape (2802995, 3)
(2802995, 3)


Unnamed: 0,STARTTIME,STARTLAT,STARTLON
0,2019-01-01 02:47:00,48.088402,11.4806
1,2019-01-01 04:00:00,48.105709,11.41446
2,2019-01-01 15:29:00,48.155258,11.54012


In [20]:
# Preprocess the rental data
rental_df_processed = pre_process_rental_df(rentals_df)
print(rental_df_processed.shape)
rental_df_processed.head(3)

(2802995, 3)


Unnamed: 0,STARTTIME,STARTLAT,STARTLON
0,2019-01-01 02:47:00,48.088402,11.4806
1,2019-01-01 04:00:00,48.105709,11.41446
2,2019-01-01 15:29:00,48.155258,11.54012


In [21]:
# Data Quality Check
duplicate_count = rental_df_processed.duplicated().sum()
print("Number of Duplicate Rows:", duplicate_count)

Number of Duplicate Rows: 23391


In [15]:
# Load polygons from the file
polygons_file_path = '../raw_data/polygons.csv'
polygons = load_polygons(polygons_file_path)
print(len(polygons))

# Perform spatial join between rental data and polygons
rental_geo_df = get_district(rental_df_processed, polygons)
rental_geo_df.head(3)
print(rental_geo_df.shape)

36
(681468, 4)


In [None]:
# Encode district labels using one-hot encoding
encoded_rental_df = encode_district_label(rental_geo_df)

In [None]:
# Group rental data by hour
rental_df_by_hour = group_rental_data_by_hour(encoded_rental_df)

# Retrieve weather information and merge with rental data
rental_df_all = get_weather_info(rental_df_by_hour)

# Perform feature engineering on the merged data
X, y = feature_extraction(rental_df_all)

# Scale the numeric features in X using StandardScaler
X_scaled = scale_numeric_features(X)