In [None]:
#!pip install --pre pandas==2.0.*

In [None]:
import duckdb
import boto3
import hopsworks
import pandas as pd
from datetime import datetime

In [None]:
# duckdb.__version__

In [None]:
pd.__version__

In [None]:
MAX_MEMORY = "35GB" # increase to available python memory -25%
TMP_DIR = "pit-data-v8"
DUCKDB_FILE = f"{TMP_DIR}/taxi.duckdb"
DATA_FOLDER = f"{TMP_DIR}/taxidata" 

# S3 Uploads
AWS_ACCESS_KEY=''
AWS_SECRET_ACCESS_KEY=''
AWS_REGION='us-east-2'
BUCKET = "hopsworks-bench-datasets"
session = boto3.Session( aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
s3 = session.resource('s3')

# HDFS Uploads
HOPS_HOST=''
HOPS_API_KEY=''
HDFS_PATH = "/Projects/testproj/Resources/"


In [None]:
!mkdir -p {TMP_DIR}
!mkdir -p {DATA_FOLDER}


In [None]:
con = duckdb.connect(DUCKDB_FILE, config={'memory_limit': MAX_MEMORY, 'temp_directory': TMP_DIR}) 
con.execute("INSTALL httpfs;")
con.execute("INSTALL parquet;")
con.execute("LOAD httpfs;")
con.execute("LOAD parquet;")
con.execute(f"""
    SET s3_region='{AWS_REGION}';
    SET s3_access_key_id='{AWS_ACCESS_KEY}';
    SET s3_secret_access_key='{AWS_SECRET_ACCESS_KEY}';
    """)

In [None]:
def get_raw_data(limit, offset):
    file_path=f's3://{BUCKET}/taxidata_cleaned/*.parquet'
    raw_data = con.execute(f"SELECT * FROM read_parquet('{file_path}') LIMIT {limit};").df()
    # Add row_id to raw_data
    raw_data['row_id'] = range(offset, offset + len(raw_data))
    row_id = raw_data.pop('row_id')
    raw_data.insert(0, 'row_id', row_id)
    return raw_data

In [None]:
def filter_df_by_ts(df, ts_column, start_date, end_date):
    if ts_column and start_date:
        df = df[df[ts_column] >= start_date]
    if ts_column and end_date:
        df = df[df[ts_column] < end_date]
    return df

In [None]:
def pickup_features_fn(df, ts_column, start_date, end_date):
    df = filter_df_by_ts(df, ts_column, start_date, end_date)
    df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
    df['window'] = pd.to_datetime(df['tpep_pickup_datetime']).dt.floor('15min').dt.strftime('%Y-%m-%d %H:%M:%S')
    pickup_features = (
        df.groupby(['pu_location_id', 'pu_borough', 'window'])
        .agg(
            mean_fare_window_1h_pickup_zip=('fare_amount', 'mean'),
            count_trips_window_1h_pickup_zip=('fare_amount', 'count')
        )
        .reset_index()
        .rename(columns={'pu_location_id': 'location_id', 'pu_borough': 'borough', 'window': 'ts'})
    )
    pickup_features['row_id'] = pickup_features.reset_index().index
    row_id = pickup_features.pop('row_id')
    pickup_features.insert(0, 'row_id', row_id)

    return pickup_features

def dropoff_features_fn(df, ts_column, start_date, end_date):
    df = filter_df_by_ts(df, ts_column, start_date, end_date)
    df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
    df['window'] = pd.to_datetime(df['tpep_dropoff_datetime']).dt.floor('30min').dt.strftime('%Y-%m-%d %H:%M:%S')
    dropoff_features = (
        df.groupby(['do_location_id', 'do_borough', 'window'])
        .agg(count_trips_window_30m_dropoff_zip=('do_borough', 'count'))
        .reset_index()
        .rename(columns={'do_location_id': 'location_id', 'do_borough': 'borough', 'window': 'ts'})
    )
    dropoff_features['ts'] = pd.to_datetime(dropoff_features['ts'])
    dropoff_features['dropoff_is_weekend'] = dropoff_features['ts'].dt.dayofweek.isin([5, 6])
    dropoff_features['row_id'] = dropoff_features.reset_index().index
    row_id = dropoff_features.pop('row_id')
    dropoff_features.insert(0, 'row_id', row_id)

    return dropoff_features

In [None]:
project = hopsworks.login()
fs = project.get_feature_store()

In [None]:
scale_factor = [1,2,5,10]

for sf in scale_factor:
    raw_features_limit = sf * 1000000 # This is used for transformation of pickup and dropoff features
    raw_data = get_raw_data(raw_features_limit, 0)
#     pickup_features
    pickup_features = pickup_features_fn(
        df=raw_data,
        ts_column="tpep_pickup_datetime",
        start_date=datetime(2011, 1, 1),
        end_date=datetime(2023, 1, 31),
    )
    pickup_features['ts'] = pd.to_datetime(pickup_features['ts'], format='%Y-%m-%d %H:%M:%S')
#     dropoff_features
    dropoff_features = dropoff_features_fn(
        df=raw_data,
        ts_column="tpep_dropoff_datetime",
        start_date=datetime(2011, 1, 1),
        end_date=datetime(2023, 1, 31),
    )
    dropoff_features['ts'] = pd.to_datetime(dropoff_features['ts'], format='%Y-%m-%d %H:%M:%S')
    pickup_fg = fs.get_or_create_feature_group(
        name=f"pit_pickup_features_{sf}",
        version=1,
        primary_key=["row_id"],
        event_time=["ts"],
        online_enabled=False,
        description="NYC Taxi data pickup features"
    )
    pickup_fg.insert(pickup_features, write_options={"wait_for_job" : True})
    dropoff_fg = fs.get_or_create_feature_group(
        name=f"pit_dropoff_features_{sf}",
        version=1,
        primary_key=["row_id"],
        event_time=["ts"],
        online_enabled=False,
        description="NYC Taxi data dropoff features")
    dropoff_fg.insert(dropoff_features, write_options={"wait_for_job" : True})
    
    if raw_features_limit < 5000000:
        batch_read_limit = 1000000 # Ingest 1M at a time for SF 1 and 2
    else:
        batch_read_limit = 5000000 
        
    offset = 0
    total_rows = sf * 1000000

    while offset < total_rows:
        raw_data = get_raw_data(batch_read_limit, offset)
        raw_fg = fs.get_or_create_feature_group(
            name=f"pit_raw_features_{sf}",
            version=1,
            primary_key=["row_id"],            
            event_time=["tpep_pickup_datetime"],
            online_enabled=False,
            description="NYC Taxi data raw data features")
        raw_fg.insert(raw_data, write_options={"wait_for_job" : True})
        offset += batch_read_limit

## Benchmark PIT Correct Join

In [None]:
import time

scale_factor = [1, 2, 5, 10]

for sf in scale_factor:
    # Retrieval
    pickup_fg = fs.get_feature_group(
        name=f"pit_pickup_features_{sf}",
        version=1
    )
    dropoff_fg = fs.get_feature_group(
        name=f"pit_dropoff_features_{sf}",
        version=1
    )
    raw_fg = fs.get_feature_group(
        name=f"pit_raw_features_{sf}",
        version=1
    )
    # PIT JOIN
    start = time.time()
    df = raw_fg.select_all()\
                .join(pickup_fg.select_all(), left_on=["pu_location_id"], right_on=["location_id"], prefix="pf_")\
                .join(dropoff_fg.select_all(), left_on=["do_location_id"], right_on=["location_id"], prefix="df_")\
                .read()
    time_taken = time.time() - start
    print(f"PIT JOIN - SF {sf}: {time_taken}")


# Clean up

In [None]:
pickup_fg.delete()
dropoff_fg.delete()
raw_fg.delete()

In [None]:
con.close()