# Feathr Local Spark Quickstart - NYC Taxi Demo
This notebook demonstrates how to use Feathr Local to train a model on a local Spark cluster. We will use the NYC Taxi dataset to predict the tip amount for a taxi ride. The dataset is available on [Kaggle](https://www.kaggle.com/c/new-york-city-taxi-fare-prediction/data).

## 1. Install Feathr and Necessary Dependancies

Install feathr and necessary packages by running `pip install feathr[notebook]` if you haven't installed them already.

In [None]:
%pip install -U pandavro scikit-learn

## 2. Create Shareable Features with Feathr Feature Definition Configs

In this notebook, we define all the necessary resource key values for authentication. We use the values passed by the databricks widgets at the top of this notebook. Instead of manually entering the values to the widgets, we can also use [Azure Key Vault](https://azure.microsoft.com/en-us/services/key-vault/) to retrieve them.
Please refer to [how-to guide documents for granting key-vault access](https://feathr-ai.github.io/feathr/how-to-guides/azure-deployment-arm.html#3-grant-key-vault-and-synapse-access-to-selected-users-optional) and [Databricks' Azure Key Vault-backed scopes](https://learn.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes) for more details.

In [None]:
import glob
import os
import tempfile
from datetime import datetime, timedelta
from math import sqrt

import pandas as pd
import pandavro as pdx
from feathr import FeathrClient
from feathr import BOOLEAN, FLOAT, INT32, ValueType
from feathr import Feature, DerivedFeature, FeatureAnchor
from feathr import BackfillTime, MaterializationSettings
from feathr import FeatureQuery, ObservationSettings
from feathr import RedisSink
from feathr import INPUT_CONTEXT, HdfsSource
from feathr import WindowAggTransformation
from feathr import TypedKey
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F

In [None]:
import feathr
print(feathr.__version__)

## Initialize Feathr Client 
We will use the Feathr Local client to train our model. The client will be initialized with the default configuration. The default configuration can be overwritten by environment variables.

In [None]:
os.environ['SPARK_LOCAL_IP'] = "127.0.0.1"
os.environ['REDIS_PASSWORD'] = "foobared" # default password for Redis

import tempfile
yaml_config = f"""
api_version: 1
project_config:
  project_name: 'local_spark'
  required_environment_variables:
  optional_environment_variables:
    # the environment variables are optional, however you will need them if you want to use some of the services:
    - ADLS_ACCOUNT
    - ADLS_KEY
    - WASB_ACCOUNT
    - WASB_KEY
    - S3_ACCESS_KEY
    - S3_SECRET_KEY
    - JDBC_TABLE
    - JDBC_USER
    - JDBC_PASSWORD
    - KAFKA_SASL_JAAS_CONFIG


spark_config:
  # choice for spark runtime. Currently support: azure_synapse, databricks, local
  spark_cluster: 'local'
  spark_result_output_parts: '1'
  local:
    master: 'local[*]'
    feathr_runtime_location:

online_store:
  redis:
    # Redis configs to access Redis cluster
    host: '127.0.0.1'
    port: 6379
    ssl_enabled: False

feature_registry:
  # The API endpoint of the registry service
  api_endpoint: "http://127.0.0.1:8000/api/v1"
"""

tmp = tempfile.NamedTemporaryFile(mode='w', delete=False)
with open(tmp.name, "w") as text_file:
    text_file.write(yaml_config)

### Initialize Feathr Client

In [None]:
client = FeathrClient(tmp.name)

### Download the sample data
The sample data will be downloaded to your workspace.

In [None]:
import pandas as pd
DATA_FILE_PATH = "/tmp/green_tripdata_2020-04_with_index.csv"
df_raw = pd.read_csv("https://azurefeathrstorage.blob.core.windows.net/public/sample_data/green_tripdata_2020-04_with_index.csv")
df_raw.to_csv(DATA_FILE_PATH, index=False)
df_raw

### Defining features with Feathr

In Feathr, a feature is viewed as a function, mapping a key and timestamp to a feature value. For more details, please see [Feathr Feature Definition Guide](https://github.com/feathr-ai/feathr/blob/main/docs/concepts/feature-definition.md).

* The feature key (a.k.a. entity id) identifies the subject of feature, e.g. a user_id or location_id.
* The feature name is the aspect of the entity that the feature is indicating, e.g. the age of the user.
* The feature value is the actual value of that aspect at a particular time, e.g. the value is 30 at year 2022.

Note that, in some cases, a feature could be just a transformation function that has no entity key or timestamp involved, e.g. *the day of week of the request timestamp*.

There are two types of features -- anchored features and derivated features:

* **Anchored features**: Features that are directly extracted from sources. Could be with or without aggregation. 
* **Derived features**: Features that are computed on top of other features.

#### Define anchored features

A feature source is needed for anchored features that describes the raw data in which the feature values are computed from. A source value should be either `INPUT_CONTEXT` (the features that will be extracted from the observation data directly) or `feathr.source.Source` object.

In [None]:
TIMESTAMP_COL = "lpep_dropoff_datetime"
TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"

In [None]:
def preprocessing(df: DataFrame) -> DataFrame:
    import pyspark.sql.functions as F
    df = df.withColumn("fare_amount_cents", (F.col("fare_amount") * 100.0).cast("float"))
    return df

batch_source = HdfsSource(
    name="nycTaxiBatchSource",
    path=DATA_FILE_PATH,
    event_timestamp_column=TIMESTAMP_COL,
    preprocessing=preprocessing,
    timestamp_format=TIMESTAMP_FORMAT,
)

In [None]:
# We define f_trip_distance and f_trip_time_duration features separately
# so that we can reuse them later for the derived features.
f_trip_distance = Feature(
    name="f_trip_distance",
    feature_type=FLOAT,
    transform="trip_distance",
)
f_trip_time_duration = Feature(
    name="f_trip_time_duration",
    feature_type=FLOAT,
    transform="cast_float((to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime)) / 60)",
)



features = [
    f_trip_distance,
    f_trip_time_duration,
    Feature(
        name="f_is_long_trip_distance",
        feature_type=BOOLEAN,
        transform="trip_distance > 30.0",
    ),
    Feature(
        name="f_day_of_week",
        feature_type=INT32,
        transform="dayofweek(lpep_dropoff_datetime)",
    ),
    Feature(
        name="f_day_of_month",
        feature_type=INT32,
        transform="dayofmonth(lpep_dropoff_datetime)",
    ),
    Feature(
        name="f_hour_of_day",
        feature_type=INT32,
        transform="hour(lpep_dropoff_datetime)",
    ),
]

# After you have defined features, bring them together to build the anchor to the source.
feature_anchor = FeatureAnchor(
    name="feature_anchor",
    source=INPUT_CONTEXT,  # Pass through source, i.e. observation data.
    features=features,
)

For the features with aggregation, the supported functions are as follows:

| Aggregation Function | Input Type | Description |
| --- | --- | --- |
|SUM, COUNT, MAX, MIN, AVG	|Numeric|Applies the the numerical operation on the numeric inputs. |
|MAX_POOLING, MIN_POOLING, AVG_POOLING	| Numeric Vector | Applies the max/min/avg operation on a per entry bassis for a given a collection of numbers.|
|LATEST| Any |Returns the latest not-null values from within the defined time window |

In [None]:
agg_key = TypedKey(
    key_column="DOLocationID",
    key_column_type=ValueType.INT32,
    description="location id in NYC",
    full_name="nyc_taxi.location_id",
)

agg_window = "90d"

# Anchored features with aggregations
agg_features = [
    Feature(
        name="f_location_avg_fare",
        key=agg_key,
        feature_type=FLOAT,
        transform=WindowAggTransformation(
            agg_expr="fare_amount_cents",
            agg_func="AVG",
            window=agg_window,
        ),
    ),
    Feature(
        name="f_location_max_fare",
        key=agg_key,
        feature_type=FLOAT,
        transform=WindowAggTransformation(
            agg_expr="fare_amount_cents",
            agg_func="MAX",
            window=agg_window,
        ),
    ),
]

agg_feature_anchor = FeatureAnchor(
    name="agg_feature_anchor",
    source=batch_source,  # External data source for feature. Typically a data table.
    features=agg_features,
)

#### Define derived features

We also define a derived feature, `f_trip_distance_rounded`, from the anchored features `f_trip_distance` as follows:

In [None]:
f_trip_time_distance = DerivedFeature(name="f_trip_time_distance",
                                          feature_type=FLOAT,
                                          input_features=[
                                              f_trip_distance, f_trip_time_duration],
                                          transform="f_trip_distance * f_trip_time_duration")

f_trip_time_rounded = DerivedFeature(name="f_trip_time_rounded",
                                         feature_type=INT32,
                                         input_features=[f_trip_time_duration],
                                         transform="f_trip_time_duration % 10")

derived_feature = [f_trip_time_distance, f_trip_time_rounded]

### Build features

Finally, we build the features.

In [None]:
client.build_features(
    anchor_list=[feature_anchor, agg_feature_anchor],
    derived_feature_list=derived_feature,
)

In [None]:
# This cell is optional if you want to use the feathr registry.
client.register_features()
client.list_registered_features(client.project_name)
res = client.get_features_from_registry(client.project_name)
res

## 3. Create Training Data Using Point-in-Time Correct Feature Join

After the feature producers have defined the features (as described in the Feature Definition part), the feature consumers may want to consume those features. Feature consumers will use observation data to query from different feature tables using Feature Query.

To create a training dataset using Feathr, one needs to provide a feature join configuration file to specify
what features and how these features should be joined to the observation data. 

To learn more on this topic, please refer to [Point-in-time Correctness](https://github.com/linkedin/feathr/blob/main/docs/concepts/point-in-time-join.md)

In [None]:
feature_names = [feature.name for feature in features + agg_features]
feature_names

In [None]:
now = datetime.now().strftime("%Y%m%d%H%M%S")
output_path = os.path.join("debug", f"test_output_{now}")

offline_features_path = output_path

In [None]:
# Features that we want to request. Can use a subset of features
query = FeatureQuery(
    feature_list=feature_names,
    key=agg_key,
)
settings = ObservationSettings(
    observation_path=DATA_FILE_PATH,
    event_timestamp_column=TIMESTAMP_COL,
    timestamp_format=TIMESTAMP_FORMAT,
)
client.get_offline_features(
    observation_settings=settings,
    feature_query=query,
    output_path=offline_features_path,
)

client.wait_job_to_finish(timeout_sec=5000)

In [None]:
dataframe_list = []
vertical_concat_df = None
for file in glob.glob(os.path.join(output_path, '*.avro')):
    dataframe_list.append(pdx.read_avro(file))
    vertical_concat_df = pd.concat(dataframe_list, axis=0)

vertical_concat_df

## 4. Train and Evaluate a Prediction Model

After generating all the features, we train and evaluate a machine learning model to predict the NYC taxi fare prediction. In this example, we use Spark MLlib's [GBTRegressor](https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression).

Note that designing features, training prediction models and evaluating them are an iterative process where the models' performance maybe used to modify the features as a part of the modeling process.

In [None]:
# remove columns
from sklearn.ensemble import GradientBoostingRegressor
final_df = vertical_concat_df
final_df.drop(["lpep_pickup_datetime", "lpep_dropoff_datetime",
              "store_and_fwd_flag"], axis=1, inplace=True, errors='ignore')
final_df.fillna(0, inplace=True)
final_df['fare_amount'] = final_df['fare_amount'].astype("float64")


train_x, test_x, train_y, test_y = train_test_split(final_df.drop(["fare_amount"], axis=1),
                                                    final_df["fare_amount"],
                                                    test_size=0.2,
                                                    random_state=42)


In [None]:
model = GradientBoostingRegressor()
model.fit(train_x, train_y)

y_predict = model.predict(test_x)

y_actual = test_y.values.flatten().tolist()
rmse = sqrt(mean_squared_error(y_actual, y_predict))

sum_actuals = sum_errors = 0

for actual_val, predict_val in zip(y_actual, y_predict):
    abs_error = actual_val - predict_val
    if abs_error < 0:
        abs_error = abs_error * -1

    sum_errors = sum_errors + abs_error
    sum_actuals = sum_actuals + actual_val

In [None]:
mean_abs_percent_error = sum_errors / sum_actuals
print("Model MAPE:")
print(mean_abs_percent_error)
print()
print("Model Accuracy:")
print(1 - mean_abs_percent_error)


## 5. Materialize Feature Values for Online Scoring

While we computed feature values on-the-fly at request time via Feathr, we can pre-compute the feature values and materialize them to offline or online storages such as Redis.

Note, only the features anchored to offline data source can be materialized.

In [None]:
materialized_feature_names = [feature.name for feature in agg_features]
materialized_feature_names

In [None]:
REDIS_KEY = None

In [None]:
FEATURE_TABLE_NAME = "nycTaxiDemoFeature"

backfill_time = BackfillTime(start=datetime(
    2020, 4, 1), end=datetime(2020, 4, 1), step=timedelta(days=1))
redisSink = RedisSink(table_name=FEATURE_TABLE_NAME)
settings = MaterializationSettings(FEATURE_TABLE_NAME + ".job",
                                    sinks=[redisSink],
                                    feature_names=[
                                        "f_location_avg_fare", "f_location_max_fare"],
                                    backfill_time=backfill_time)
client.materialize_features(settings)

client.wait_job_to_finish(5000)

In [None]:
# Note, to get a single key, you may use client.get_online_features instead
materialized_feature_values = client.multi_get_online_features(
    feature_table=FEATURE_TABLE_NAME,
    keys=["247", "220"],
    feature_names=materialized_feature_names,
)
materialized_feature_values

## Clean up the output of the notebook

In [None]:
import shutil
shutil.rmtree('debug')