# Build out Featureset

In [None]:
dbutils.widgets.text("environment", "dev")

In [None]:
curr_env = dbutils.widgets.get("environment")
curr_catalog = f'brian_ml_{curr_env}'

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, IntegerType, StringType
from pytz import timezone

# Feature Engineering Functions

In [None]:
@udf(returnType=IntegerType())
def is_weekend(dt):
    tz = "America/New_York"
    return int(dt.astimezone(timezone(tz)).weekday() >= 5)  # 5 = Saturday, 6 = Sunday


def filter_df_by_ts(df, ts_column, start_date, end_date):
    if ts_column and start_date:
        df = df.filter(col(ts_column) >= start_date)
    if ts_column and end_date:
        df = df.filter(col(ts_column) < end_date)
    return df

def pickup_features_fn(df, ts_column, start_date, end_date):
    """
    Computes the pickup_features feature group.
    To restrict features to a time range, pass in ts_column, start_date, and/or end_date as kwargs.
    """
    df = filter_df_by_ts(df, ts_column, start_date, end_date)
    pickupzip_features = (
        df.groupBy(
            "pickup_zip", window("tpep_pickup_datetime", "1 hour", "15 minutes")
        )  # 1 hour window, sliding every 15 minutes
        .agg(
            mean("fare_amount").alias("mean_fare_window_1h_pickup_zip"),
            count("*").alias("count_trips_window_1h_pickup_zip"),
        )
        .select(
            col("pickup_zip").alias("zip"),
            unix_timestamp(col("window.end")).cast("timestamp").alias("ts"),
            col("mean_fare_window_1h_pickup_zip").cast(FloatType()),
            col("count_trips_window_1h_pickup_zip").cast(IntegerType()),
        )
    )
    return pickupzip_features


def dropoff_features_fn(df, ts_column, start_date, end_date):
    """
    Computes the dropoff_features feature group.
    To restrict features to a time range, pass in ts_column, start_date, and/or end_date as kwargs.
    """
    df = filter_df_by_ts(df, ts_column, start_date, end_date)
    dropoffzip_features = (
        df.groupBy("dropoff_zip", window("tpep_dropoff_datetime", "30 minute"))
        .agg(count("*").alias("count_trips_window_30m_dropoff_zip"))
        .select(
            col("dropoff_zip").alias("zip"),
            unix_timestamp(col("window.end")).cast("timestamp").alias("ts"),
            col("count_trips_window_30m_dropoff_zip").cast(IntegerType()),
            is_weekend(col("window.end")).alias("dropoff_is_weekend"),
        )
    )
    return dropoffzip_features

# Load and Inspect Data

Lets load our raw data and see what it contains

In [None]:
raw_data = spark.table(f'{curr_catalog}.warehouse.raw_data')
display(raw_data)

In [None]:
from datetime import datetime

pickup_features = pickup_features_fn(
    df=raw_data,
    ts_column="tpep_pickup_datetime",
    start_date=datetime(2016, 1, 1),
    end_date=datetime(2016, 1, 31),
)
dropoff_features = dropoff_features_fn(
    df=raw_data,
    ts_column="tpep_dropoff_datetime",
    start_date=datetime(2016, 1, 1),
    end_date=datetime(2016, 1, 31),
)


In [None]:
display(pickup_features)

In [None]:
display(dropoff_features)

# Create the Feature Table Definitions

We will use SQL Syntax to create the feature tables first before we write to it

In [None]:
%sql

CREATE TABLE IF NOT EXISTS brian_ml_${environment}.warehouse.trip_pickup_time_series_features(
  zip INT NOT NULL,
  ts TIMESTAMP NOT NULL,
  mean_fare_window_1h_pickup_zip FLOAT,
  count_trips_window_1h_pickup_zip INT,
  CONSTRAINT trip_pickup_time_series_features_pk PRIMARY KEY (zip, ts TIMESERIES)
)
COMMENT "Taxi Fares. Pickup Time Series Features";


In [None]:
%sql

CREATE TABLE IF NOT EXISTS brian_ml_${environment}.warehouse.trip_dropoff_time_series_features(
  zip INT NOT NULL,
  ts TIMESTAMP NOT NULL,
  count_trips_window_30m_dropoff_zip INT,
  dropoff_is_weekend INT,
  CONSTRAINT trip_dropoff_time_series_features_pk PRIMARY KEY (zip, ts TIMESERIES)
)
COMMENT "Taxi Fares. Dropoff Time Series Features";

# Write to Feature Tables

We will create a feature table and write to it

In [None]:
from databricks import feature_store

fs = feature_store.FeatureStoreClient()

# To append we can set the `mode` to be overwrite`
spark.conf.set("spark.sql.shuffle.partitions", "5")
fs.write_table(
    name=f"{curr_catalog}.warehouse.trip_pickup_time_series_features",
    df=pickup_features,
    mode='overwrite'
)
fs.write_table(
    name=f"{curr_catalog}.warehouse.trip_dropoff_time_series_features",
    df=dropoff_features,
    mode='overwrite'
)