***

# Taxi Trip Fare Prediction - Model 3

***

The goal of this example is to build on the Model 2 example and add features from real-time streaming data. We will
- train an ML model based on historical taxi trip fare data and contextual features
- add real-time window aggregate features calculated on real-time streaming data
- serve the ML model to predict the trip fare for new trips

### Prepare your data

In the Model 1 example we used trip table from S3 bucket to use as training dataset.

### Prepare your static contextual feature data

In the Model 2 example we enhanced the data by adding three contextual feature tables. 

- an hourly segment table that maps an hour to an hourly_segment. 
- a holiday weekend table that maps a date to a flag indicating whether that date was a holiday-or-weekend or neither.
- a geo area table that maps a zipcode to a type of geo area.

### Prepare your real-time contextual feature data

In this example we will connect to a real-time stream of trip events and add a real-time contextual feature.

- the total passenger count for a zipcode within the last 4 hours

The idea is that the total passenger count for all trips starting at a zipcode within the last 4 hours, indicates the recent demand at a zipcode and has an influence on the trip fare amount. We can create a more accurate ML model with this additional feature.

***

**We will reuse the `trip_fare` project from Model 1 for this example.**

In [None]:
set project trip_fare

***

# Connect your Data Sources

<html><img src="../../images/trip_fare_images/3_1.png"/></html>

In the Model 1 example we have connected the S3 bucket as a data source to Foresight for the trip table. In the Model 2 example we have connected the the same S3 bucket as a data sources to Foresight for contextual feature tables. In this step we will continue with the same data sources for training the model later we will connect the real-time data from kafka to Foresight during model serving step. This will allow Foresight to read historical real-time data from S3 and current real-time data from kafka.

***

# Create a Feature Set for real-time aggregated features

<html><img src="../../images/trip_fare_images/3_2.png"/></html>

In this step we will create a feature set to generate and store aggregated features in Foresight storage based on the real-time data source. Remember that the real-time contextual feature that we are trying to generate is
- the total passenger count in the last 4 hours for any zipcode

To generate this feature, the feature set needs to compute the sum aggregate of the passenger_count for all trips, grouped by the pickup_zipcode. In SQL terms the query looks like this,

SELECT SUM(passenger_count) FROM \<source data\> GROUP BY pickup_zipcode 

### Create a Foresight ML job file to generate a feature set

The feature set will be created using a Foresight ML job file. The `using_foresight_options` section of the Foresight ML job file is where you specify the SQL aggregation function and the GROUP BY entities. Create a Foresight ML job file using the templates and code snippets available at the icons to the left. Refer to the Foresight User Manual for help.

In [None]:
!cat trip_fare_prediction_model_3/trip_events_context.ml

### Create a feature set

Use the `start featureset` command to execute the Foresight ML job file to create the feature set. This command will start a job that creates the feature set tables within Foresight, and inserts data into the Foresight tables from the data source. The job will perform aggregations on the data as it is being fetched. It will fetch data from the historical backfill source as well as the real-time streaming source. The job continues to run forever as the data source is a real-time kafka stream. The `status featureset` command will show the status of the feature set.


In [None]:
start featureset trip_fare_prediction_model_3/trip_events_context

In [None]:
status featureset trip_events_context

In [None]:
start featureset trip_fare_prediction_model_3/trip_events_aggr1

In [None]:
list featuresets

In [None]:
status featureset trip_events_aggr1

In [None]:
display featureset trip_events_context

***

# Create a Feature View to serve contextual features

<html><img src="../../images/trip_fare_images/3_3.png"/></html>

In the Model 2 example we created a feature view to serve four contextual features from Foresight feature sets. In this step we will create a feature view to serve those four contextual features as well as a fifth real-time contextual feature from the feature set that we just created. The feature view will output the following contextual features
- the hourly_segment for a given hour of day
- the holiday_or_weekend flag for a given date
- the pickup_geo_area for a given pickup zipcode
- the dropoff_geo_area for a given dropoff zipcode
- the total passenger count in the last 4 hours for a given zipcode

### Create a Foresight ML job file to generate a feature view

The feature view will be created using a Foresight ML job file. The `using_foresight_options` section of the Foresight ML job file is where you specify the aggregated feature name and the window for aggregation.  Create a Foresight ML job file using the templates and code snippets available at the icons to the left. Refer to the Foresight User Manual for help.

In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_feature_view.ml

### Start serving contextual features

Use the `start featureview` command to execute the Foresight ML job file to start serving contextual features for the feature view. This command starts a job to serve the feature view. Use the `offline` option to serve features for training dataset creation and the `online` option to serve features for prediction. 

The `status featureview` command will show the status of the feature view. The *`feature_status`* element indicates the availability of feature data. A feature status of "OK" indicates that feature data is available. A feature status of "DATA_NOT_READY" indicates that data is still being fetched from historical or stream sources and is not up-to-date for consumption.

**It may take up to 10 minutes for the *`feature_status`* to show "OK" due to the size and duration of the historical feature data.** If the feature status shows **"DATA_NOT_READY"** that means the historical data is still being fetched and processed. The data has not reached the current time. The user must wait till the feature status shows "OK" before proceeding to the next steps.

In [None]:
start featureview trip_fare_prediction_model_3/trip_fare_3_feature_view,offline

In [None]:
start featureview trip_fare_prediction_model_3/trip_fare_3_feature_view,online

In [None]:
status featureview trip_fare_3_feature_view,offline

In [None]:
status featureview trip_fare_3_feature_view,online

### Explore feature sets and feature views

Explore the feature sets and feature views that you created using `Foresight Explorer`. The `Foresight Explorer` tool can be opened by clicking on the following icon in the Launcher page. 

<html><img src="../../images/trip_fare_images/2_7.png"/></html>

Navigate to the `Foresight Explorer` web page and open the `trip_fare` project. Explore the feature sets and feature views within that project.

***

# Create a Training Dataset

<html><img src="../../images/trip_fare_images/3_4.png"/></html>

In this step we will create a training dataset using the trip table data source and the contextual features. We will use the pickup_zipcode, dropoff_zipcode and passenger_count as input features to the ML model. We will use the ***contextual_feature_fetch*** UDF to fetch the hourly_segment, the is_holiday_or_weekend flag and the total_passenger_count_4hr from the feature view and use those as additional inputs to the ML model. The fare_amount will be the target or label for the ML model to train. 

### Create a Foresight ML job file to generate a training dataset

The training dataset will be created using a SQL command. SQL commands can be executed via Foresight ML job files. Create a Foresight ML job file using the templates and code snippets available at the icons to the left. Refer to the Foresight User Manual for help.

In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_train_dataset.ml

### Create the dataset

Use the `start dataset` command to execute the Foresight ML job file to create the training dataset.The `status dataset` command will show the current status of dataset generation; "RUNNING", "COMPLETED" or "ERROR". The `list datasets` command will list the created datasets within a project. The `display dataset` command will display the first few rows of the training dataset.

**This command may take up to 10 minutes due to the size of the dataset.**

In [None]:
start dataset trip_fare_prediction_model_3/trip_fare_3_train_dataset

In [None]:
status dataset trip_fare_3_train_dataset

In [None]:
list datasets

In [None]:
display dataset trip_fare_3_train_dataset

### Explore the dataset

Use the `explore dataset` command to visually explore the dataset using the Foresight data explorer. The `target_column` is the target or label for ML training. Click on the output url to visualize the dataset.

**This command may take a few minutes due to the size of the dataset.**

In [None]:
explore data-quality trip_fare_prediction_model_3/trip_fare_3_train_dataset_data_quality

***

# Train an ML Model

<html><img src="../../images/trip_fare_images/3_5.png"/></html>

In this step we will train an ML model using the training dataset that was created. We will use the pickup_zipcode, dropoff_zipcode, passenger_count, distance, hourly_segment, is_holiday_or_weekend, and total_passenger_count_4hr as input features to the ML model. The fare_amount will be the target or label for the ML model to train. 

### Create a Foresight ML job file for model training

ML model training is initiated via a Foresight ML job file which specifies the ML training parameters. Create a Foresight ML job file using the templates and code snippets available at the icons to the left. Refer to the Foresight User Manual for help.

### Start ML model training

Use the `start training` command to execute the Foresight ML job file to start the model training. The `status training` command will show the status of the model training. 

### Machine Learning

In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_ml_model_train.ml

In [None]:
start training trip_fare_prediction_model_3/trip_fare_3_ml_model_train,limit=2000

**Click the url shown in the output of status to open a *ML-Flow* session that displays the training metrics.**

#### Wait for ML model training to complete

Use the `status training` command to check the status of the model training. Wait for the ML model training status to complete. 

**Training could take 10 minutes or more to complete.**

In [None]:
status training trip_fare_3_ml_model_train

### Deep Learning

In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_dl_model_train.ml

In [None]:
start training trip_fare_prediction_model_3/trip_fare_3_dl_model_train

**Click the url shown in the output to open a *TensorBoard* session that displays the training progress and metrics.** After opening the *TensorBoard* url click on the reload button to the top right of the *TensorBoard* page.

**Training could take 10 minutes or more to complete.**

In [None]:
status training trip_fare_3_dl_model_train

#### Note:
 TensorBoard is only available for Deep Learning models   

In [None]:
list tensorboard "<model name>,<run_id>"

## Register a trained ML model

After the training is complete, the `status training` command will show COMPLETED status. The trained ML model must be registered before it can be used for predictions. The `list trained-models` command will list all the trained models within a project. The `register model` command will register a trained model. The `list registered-models` will list all registered models within a project.

##### To list all the ML models that have been trained

In [None]:
list trained-models trip_fare_3_ml_model

##### To list all the DL models that have been trained

In [None]:
list trained-models trip_fare_3_dl_model

##### Run this cell to register the machine learning model

In [None]:
register model trip_fare_3_ml_model,1,PRODUCTION

##### Run this cell to register the deep learning model

In [None]:
register model trip_fare_3_dl_model,1,PRODUCTION

#### To list all registered models

In [None]:
list registered-models

Now for serving we will calculate real-time contextual features which are aggregated on top of streaming data. For this we'll first publish the streaming events and then add the streaming service to featureset which allows it to run aggregations on real-time data.

Real-time trip events are published to a kafka stream using the trip table csv file. We will use the same trip table csv file for historical data to train the ML model as well as for real-time data when serving the ML model.

First we will fetch the trip table csv using wget and peek at a few lines of data from the csv file. The data includes the pickup datetime, pickup latitude, longitude, dropoff latitude, longitude, pickup and dropoff zipcodes, passenger count and fare amount. We will print the first few lines using the `head` command.

In [None]:
!wget "https://foresight-tutorial.s3.us-west-2.amazonaws.com/trip_fare/trip_table.csv"

In [None]:
!head -n 5 trip_table.csv

### Publish your real-time contextual feature data

We will use kafka as the data source for the real-time trip events. We will publish trip events to a kafka broker. Use `kafka-producer-csv.py` to publish trip events from the trip table csv file to the kafka broker. This program takes each trip event from the trip_table csv file, updates the timestamps to reflect the current time and continuously publishes events to a kafka broker. It is a long running program and must be executed in the background, otherwise it will prevent other notebook cells from being executed until it completes. The `-b` option specifies the URL of the kafka broker. The `-i` option specifies the input csv file name. The `-e` option specifies the duration in minutes to publish events, the default is 15 minutes. The `-g` option obtains the kafka broker address and topic. The `-h` option displays help.

Note the `kafka source meta` from the output below. It will be used later to connect the kafka topic to the Foresight platform.

The second command below, that starts with an `&`, will publish trip events in the background to the kafka broker for a duration of 60 minutes.

In [None]:
!kafka-producer-csv.py -b <kafka server>:9092 -i trip_table.csv -g

In [None]:
&kafka-producer-csv.py -b <kafka server>:9092 -i trip_table.csv -e 60

### Add Real-Time Data Source

<html><img src="../../images/trip_fare_images/3_6.png"/></html>

Data sources are connected to Foresight via a Foresight ML sources file. In the Model 1 and 2 examples we have created a Foresight ML sources file to connect the S3 bucket as a source to the Foresight platform. Create another Foresight ML sources file to add a kafka source. Use the templates and code snippets available at the icons to the left. Refer to the Foresight User Manual for help.
Alternatively you may use the Foresight ML sources file from this tutorial.

**Make sure you update the Foresight ML sources file with the correct kafka broker url and topic obtained from the *"Publish your data"* step above.**
<br>The relevant section in the `trip_fare_3_data_sources.yml` file looks like this:
    
         meta:
              source_type: kafka
              source_format: csv
              url: <kafka server>:9092                                              <<<
              topic: tutorial_client_<xxxx_xxxxxx>_trip_table                       <<<
              offset: latest
              streaming_window: 2 seconds
              preprocessor: com.aizen.preprocessors.DefaultCsvPreprocessor


In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_data_sources.yml

#### Add column schema to your data sources file

Foresight can automatically infer column schema from your data sources and update the ML sources file. Use the `add columns` command to automatically infer and update the ML sources file with the data source column schema. After this command completes, you must review the column schema for correctness and if necessary edit the ML sources file to fix column names or data types. Alternatively you may manually edit the ML sources file and add all the column names and data types to match your data source schema.

In [None]:
add columns trip_fare_prediction_model_3/trip_fare_3_data_sources.yml

In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_data_sources.yml

We have connected Kafka as a data source through sources file. We will now add the kafka as a source to featureset which will enable the Foresight featureset service to read the data from Kafka and perform the aggregations on streaming data. Run the below cell to alter the featureset to calculate aggregates on streaming data alongisde backfill data.

In [None]:
alter featureset trip_events_context,"add source trip_fare_prediction_model_3/trip_fare_3_data_sources.yml:kafka.trip_events"

You can choose to remove Kafka as a source to featureset by replacing "add" by "remove" in the above command.

***

# Serve an ML Model

<html><img src="../../images/trip_fare_images/3_7.png"/></html>

In this step we will deploy the trained ML model to serve prediction requests. 

### Create a Foresight ML job file for model serving

ML models are deployed via a Foresight ML job file which specifies the ML serving options. 

Create a Foresight ML job file using the registered-model version that you want to serve. 

The `create prediction` command takes 2 required parameters the registered-model name and the model version. The 'dir' parameter specifies the location where the generated files will be saved. The command will generate 3 files, a Foresight ML job file, a sources yaml and a sample curl command requests file. Refer to the Foresight User Manual for help.

The sources yaml will contain definitions for two REST sources, one for the prediction REST request and one for the prediction REST response and a definition for the prediction log table.

**In the command below, replace the version '1' with the version of the registered model you are using.**

In the following cells we have used machine learning models to serve predictions, if you choose to use deep learning model for predictions replace trip_fare_3_ml_model to trip_fare_3_dl_model.

In [None]:
create prediction trip_fare_3_ml_model,1,dir=trip_fare_prediction_model_3/

***

### Inspect the model serving files

Inspect the model serving ML job file and the definitions for the prediction REST request, prediction REST response and the prediction log table.

**Note: The generated files names have the model version number as shown below. In the commands below, replace the version '1' with the version of the registered model you are using.**

    Example : <model name>_<version>_serve.ml , <model name>_<version>_sources.yml

    

In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_ml_model_1_serve.ml

In [None]:
!cat trip_fare_prediction_model_3/trip_fare_3_ml_model_1_sources.yml

### Deploy the model

Use the `start prediction` command to execute the Foresight ML job file to deploy a model. The `status prediction` command will show the status of the model serving. The url shown in the output is the endpoint to which REST prediction request may be sent via `curl` or some other means.

In [None]:
start prediction trip_fare_prediction_model_3/trip_fare_3_ml_model_1_serve

In [None]:
status prediction trip_fare_3_ml_model_1_serve

## Predict trip fare amounts

Use the `test prediction` command to send prediction requests to the deployed model. The command by default uses the last 10 rows from the training dataset for prediction request data and sends curl requests to the deployed model. The predictions responses are collected and displayed.

Refer to the Foresight User Manual for help.

Note: Once you run start prediction command, a prediction service starts running which is ready for serving. You can use the URL the prediction service gives you to send curl requests. Upon running the test prediction it also outputs the "Example Curl Request". Use this Curl request example to send data to predcition service or integrate the same into applications which where the predictions can be served.

In [None]:
test prediction trip_fare_3_ml_model_1_serve

Below is a markdowncell which shows how to run the Curl Request to fetch predictions. Convert the cell into Code state and then enter the prediction URL in the space mentioned and execute the cell to get response.

!curl -X GET ">enter the prediction URL here<" -H "Content-Type: application/json" -d '[{"rest_request_id": "prediction_test-1", "pickup_datetime": "2022-11-12 11:29:05", "pickup_zipcode": "10069", "dropoff_zipcode": "10107", "passenger_count": 3}]'

### Stop the deployed model

Use the `stop prediction` command to stop ML model serving when you have completed the prediction requests. This step is optional, you may choose to leave the model deployed.

In [None]:
stop prediction trip_fare_3_ml_model_1_serve