***

# Taxi Trip Fare Prediction - Model 3

***

The goal of this example is to build on the Model 2 example and add real-time features from the Events Data Sink. We will
- train an ML model based on historical trip fare Events Data Sink and the three Static Data Sinks
- add real-time window aggregate features calculated on real-time streaming data to the Events Data Sink
- serve the ML model to predict the trip fare for new trips

### Prepare your data

In the Model 1 example we used trip table from an S3 bucket to configure a data source and a data sink.

### Prepare your static contextual feature data

In the Model 2 example we enhanced the data by adding three static data sinks from three data sources. 

- an hourly segment table that maps an hour to an hourly_segment. 
- a holiday weekend table that maps a date to a flag indicating whether that date was a holiday-or-weekend or neither.
- a geo area table that maps a zipcode to a type of geo area.

### Prepare your real-time contextual feature data

In this example we will add a contextual feature that is a window time aggregated function on the Events Data Sink.

- the total passenger count for a zipcode within the last 4 hours

The idea is that the total passenger count for all trips starting at a zipcode within the last 4 hours, indicates the recent demand at a zipcode and has an influence on the trip fare amount. We can create a more accurate ML model with this additional feature.

***

**We will reuse the `trip_fare` project from Model 1 for this example.**

In [None]:
set project trip_fare

***

# Configure Data Sources

<html><img src="../../images/trip_fare_images/3_1.png"/></html>

In the Model 1 example we have configured the trip table csv file from an S3 bucket as a data source to Aizen. In the Model 2 example we have configured three more data sources corresponding to the three static csv files in the S3 bucket. Model 3 has no further data sources, so there is nothing to do for this step.


***

# Configure Data Sinks

<html><img src="../../images/trip_fare_images/3_2.png"/></html>

In the Model 1 example we have configured an Events Data Sink against the trip table data source. In the Model 2 example we have configured three Static Data Sinks to the three data sources providing contextual data. Model 3 has no further data sources or data sinks, so there is nothing to do for this step.


***

# Create a Training Dataset

<html><img src="../../images/trip_fare_images/3_3.png"/></html>

In this step we will create a training dataset from the data sinks. In the Model 1 and Model 2 examples, we added the pickup_zipcode, dropoff_zipcode, passenger_count, hourly_segment, is_holiday_or_weekend, pickup_geo_area and dropoff_geo_area as input features to the ML model. The fare_amount is the target or label for the ML model to train and was added as a label feature. All features were drawn from the Events Data Sink and the three Static Data Sinks.
<br>Additionally, for Model 3 we will define a contextual feature 'total_passenger_count_4hr' from the Events Data Sink. This feature will be defined as the sum of the passenger_count over the last 4 hours for a given pickup zipcode.

## Building Datasets from Data Sinks

<html><img src="../../images/trip_fare_images/3_4.png"/></html>

<br>Basis features are sourced from a single data sink. Contextual features are retrieved from data sinks using join keys from the basis features.

Datasets are configured via the `configure dataset` command. This command will prompt for various settings. The relevant information for this command is shown below. Enter this information in the prompts:
    
            Dataset: New                  Dataset Name: trip_dataset_3
            Feature: Create New
            Feature Type: Basis
            Data Sink: trip_datasink
            Feature: pickup_datetime
            Is Label: unchecked (false)   Materialize: checked (true)
           
Click the `Add Feature` button to add the pickup_datetime input feature. Continue to add all features with the following information in the prompts:

            Dataset: New
            Feature: Create New
            Feature Type: Basis
            Data Sink: trip_datasink
            Feature: pickup_zipcode
            Is Label: unchecked (false)   Materialize: checked (true)
           
Click the `Add Feature` button to add the pickup_zipcode input feature.

            Feature: Create New
            Feature Type: Basis
            Data Sink: trip_datasink
            Feature: dropoff_zipcode
            Is Label: unchecked (false)   Materialize: checked (true)

Click the `Add Feature` button to add the dropoff_zipcode input feature.           

            Feature: Create New
            Feature Type: Basis
            Data Sink: trip_datasink
            Feature: passenger_count
            Is Label: unchecked (false)   Materialize: checked (true)
            
Click the `Add Feature` button to add the passenger_count input feature.

            Feature: Create New
            Feature Type: Basis
            Data Sink: trip_datasink
            Feature: fare_amount
            Is Label: checked (true)      Materialize: checked (true)

Click the `Add Feature` button to add the fare_amount output feature.

            Feature: Create New
            Feature Type: Expression
            Name: pickup_datetime_hour_of_day
            Is Label: unchecked (false)   Materialize: unchecked (false)
            Expression: checked (true)
            Built-in Expression:
            Expression: pickup_datetime.getHours()

Click the `Add Feature` button to add the pickup_datetime_hour_of_day key.

            Feature: Create New
            Feature Type: Expression
            Name: pickup_datetime_calendar_day
            Is Label: unchecked (false)   Materialize: unchecked (false)
            Expression: checked (true)
            Built-in Expression:
            Expression: pickup_datetime.getCalendarDay()

Click the `Add Feature` button to add the pickup_datetime_calendar_day key.

            Feature: Create New
            Feature Type: Contextual
            Is Label: unchecked (false)   Materialize: checked (true)
            Expression: unchecked (false)
            Data Sink: hour_of_day_datasink
            Value: hourly_segment
    Join Key Map: 
            hour_of_day: pickup_datetime_hour_of_day

Click the `Add Feature` button to add the hourly_segment input feature.

            Feature: Create New
            Feature Type: Contextual
            Is Label: unchecked (false)   Materialize: checked (true)
            Expression: unchecked (false)
            Data Sink: holiday_weekend_datasink
            Entities: calendar_day
            Value: is_holiday_or_weekend
    Join Key Map: 
            calendar_day: pickup_datetime_calendar_day
        
Click the `Add Feature` button to add the is_holiday_or_weekend input feature.

            Feature: Create New
            Feature Type: Contextual
            Name: pickup_geo_area
            Is Label: unchecked (false)   Materialize: checked (true)
            Expression: unchecked (false)
            Data Sink: geo_area_datasink
            Value: geo_area
    Join Key Map: 
            zipcode: pickup_zipcode
        
Click the `Add Feature` button to add the pickup_geo_area input feature.

            Feature: Create New
            Feature Type: Contextual
            Name: dropoff_geo_area
            Is Label: unchecked (false)   Materialize: checked (true)
            Expression: unchecked (false)
            Data Sink: geo_area_datasink
            Value: geo_area
    Join Key Map: 
            zipcode: dropoff_zipcode
        
Click the `Add Feature` button to add the dropoff_geo_area input feature.

            Feature: Create New
            Feature Type: Contextual
            Name: total_passenger_count_4hr
            Is Label: unchecked (false)   Materialize: checked (true)
            Expression: unchecked (false)
            Data Sink: trip_datasink
            Source Column or Expression: passenger_count
            Condition:
            Function: sum
            Timestamp: pickup_datetime
            Window Start: -4h
            Window End: 0h
    Join Key Map: 
            pickup_zipcode: pickup_zipcode
        
Click the `Add Feature` button to add the total_passenger_count_4hr input feature.
<br>Click the `Save Configuration` button followed by the `OK` button to configure the dataset.

In [None]:
configure dataset

### Configure a Resource to create the Dataset

We will configure a resource to run the dataset job that materializes the dataset table. Resources are configured via the `configure resource` command. This command will prompt for various settings. The relevant information for this command is shown below. Enter this information in the prompts:
    
            Job Type: dataset              Dataset Name: trip_dataset_3           Resource Type: dataset        Select Resource: New     Resource Name: trip_dataset_resource
            Select "Basic Settings"        Dataset Size: 0-1M rows
            
Click the `Save Configuration` button to configure the resource for the dataset.

In [None]:
configure resource

### Create the Dataset

Use the `start dataset` command to materialize the configured dataset into a training dataset table.The `status dataset` command will show the current status of dataset generation; "RUNNING", "COMPLETED" or "ERROR". The `list datasets` command will list the created datasets within a project. The `display dataset` command will display the first few rows of the training dataset.

**This command may take up to 10 minutes due to the size of the dataset.**

In [None]:
start dataset trip_dataset_3

In [None]:
status dataset trip_dataset_3

In [None]:
list datasets

In [None]:
display dataset trip_dataset_3

### Data analysis on the dataset

Use the `loader` command to load the dataset for visual exploration. Run the `loader` command, click the `Datasets` button and select the `trip_dataset_3` table.

Click the `Load Table` button to load the dataset.

In [None]:
loader

In [None]:
show stats

In [None]:
plot

***

# Train an ML Model

<html><img src="../../images/trip_fare_images/3_5.png"/></html>

In this step we will train an ML model using the training dataset that was created. We will use the pickup_zipcode, dropoff_zipcode, passenger_count, hourly_segment, is_holiday_or_weekend, pickup_geo_area, dropoff_geo_area and total_passenger_count_4hr as input features to the ML model. The fare_amount will be the target or label for the ML model. 

A Training Experiment must be configured to train a model. Experiments are configured via the `configure training` command. This command will prompt for various settings. We will configure a Deep Learning experiment for Model 3. The relevant information for this command is shown below. Enter this information in the prompts:
    
            Training Experiment: New                  Experiment Name: trip_dl_exp_3        Model Name: trip_fare_3_dl_model
            Select "Deep Learning"                    Select "Basic Settings"
            Dataset: trip_dataset_3                   Select Column: pickup_datetime        Click Remove Input Feature
                                                      Select Column: pickup_datetime_hour_of_day       Click Remove Input Feature
                                                      Select Column: pickup_datetime_calendar_day      Click Remove Input Feature
            Epochs: 15                                Early Stop: 1                         Batch Size: 2048
            
Click the `Save Configuration` button to save the Deep Learning experiment configuration.           

In [None]:
configure training

### Start ML model training

Use the `start training` command to run the training experiment. The `status training` command will show the status of the model training. 

### Deep Learning

In [None]:
start training trip_dl_exp_3

**Click the url shown in the output to open a *TensorBoard* session that displays the training progress and metrics.** After opening the *TensorBoard* url click on the reload button to the top right of the *TensorBoard* page.

**Training could take 10 minutes or more to complete.**

In [None]:
status training trip_dl_exp_3

#### Note:
 TensorBoard is only available for Deep Learning models   

In [None]:
list tensorboard "<model name>,<run_id>"

## Register a trained ML model

After the training is complete, the `status training` command will show COMPLETED status. The trained ML model must be registered before it can be used for predictions. The `list trained-models` command will list all the trained models within a project. The `register model` command will register a trained model. The `list registered-models` will list all registered models within a project.

##### To list all the DL models that have been trained

In [None]:
list trained-models trip_fare_3_dl_model

##### Run this cell to register the deep learning model

In [None]:
register model trip_fare_3_dl_model,1,PRODUCTION

#### To list all registered models

In [None]:
list registered-models

## Start a real-time streaming event publisher

For serving we must calculate the 'total_passenger_count_4hr' feature from a real-time streaming data source of trip events. We will first create a real-time data stream by publishing trip event data. Real-time trip events are published to a kafka stream using the trip table csv file. We will use the same trip table csv file that we used for historical data to publish real-time streaming data, except that the event timestamp which is the pickup_datetime will be altered to the current time. This step is necessary only because we are simulating a test scenario for this tutorial. In the real world you will have real-time sources that are already publishing event data.

First we will fetch the trip table csv using wget and peek at a few lines of data from the csv file. The data includes the pickup datetime, pickup latitude, longitude, dropoff latitude, longitude, pickup and dropoff zipcodes, passenger count and fare amount. We will print the first few lines using the `head` command.

In [None]:
!wget "https://aizen-public.s3.us-west-2.amazonaws.com/trip_fare/trip_table.csv"

In [None]:
!head -n 5 trip_table.csv

### Publish real-time trip event data

We will use kafka as the data source for the real-time trip events. We will publish trip events to a kafka broker. Use `kafka-producer-csv.py` to publish trip events from the trip table csv file to the kafka broker. This program takes each trip event from the trip_table csv file, updates the timestamps to reflect the current time and continuously publishes events to a kafka broker. It is a long running program and must be executed in the background, otherwise it will prevent other notebook cells from being executed until it completes. The `-b` option specifies the URL of the kafka broker. The `-i` option specifies the input csv file name. The `-e` option specifies the duration in minutes to publish events, the default is 15 minutes. The `-g` option obtains the kafka broker address and topic. The `-h` option displays help.

Note the `kafka source meta` from the output below. It will be used later to connect the kafka topic to the Aizen platform.

The second command below, that starts with an `&`, will publish trip events in the background to the kafka broker for a duration of 60 minutes.

In [None]:
!kafka-producer-csv.py -b <kafka server>:9092 -i trip_table.csv -g

In [None]:
&kafka-producer-csv.py -b <kafka server>:9092 -i trip_table.csv -e 60

# Configure and add a Real-Time Data Source

<html><img src="../../images/trip_fare_images/3_6.png"/></html>

In this step we will connect the kafka broker and topic as a data source to Aizen. This will allow Aizen to read events from the kafka topic. Data sources are connected to Aizen via the `configure datasource` command. This command will prompt for various settings. The relevant information for this command is shown below. Enter this information in the prompts:
    
            Source: New                Source Name: trip_realtime_datasource
            Source Description: taxi trip data
            Source Type: kafka
            Source Format: json
            Broker URL: <kafka server>:9092   # replace the '<kafka server>' using the output from the previous two cells
            Kafka Topic: <kafka topic>        # replace the '<kafka topic>' using the output from the previous two cells    
            Kafka Offset:
            Poll Freq:
            
Click the `Get Columns` button and review the source column schema. 
<br>Click the `Save Configuration` button to configure the datasource.

In [None]:
configure datasource

## Add a Real-Time Data Source to a Data Sink

In the Model 1 example, we configured the 'trip_datasink' to the 'trip_datasource', which is a Data Source of historical trip events. In this Model, we have configured a kafka Data Source called 'trip_realtime_datasource' which is a Data Source of real-time trip events. We will now alter the 'trip_datasink' and add the 'trip_realtime_datasource' as a real-time source of data. This command will schedule a datasink job to periodically and continuously read real-time data from the kafka source and store it in the Data Sink. The dataset feature 'total_passenger_count_4hr', which was defined on the 'trip_datasink', will now provide materialized aggregates on real-time data from the kafka source. Before adding the real-time data source to the data sink, we will mark an outage window indicating that there is no data from when the backfill ended up to the current time. This will speed up the process of loading data into the data sink. Replace the `end_timestamp` with the current date in the following command. 

In [None]:
alter datasink trip_datasink,add outage_window start_timestamp="2022-11-28 00:00:00",end_timestamp="2024-09-04 00:00:00"

In [None]:
alter datasink trip_datasink,"add source trip_realtime_datasource"

In [None]:
status datasink trip_datasink

***

# Serve an ML Model

<html><img src="../../images/trip_fare_images/3_7.png"/></html>

In this step we will deploy a trained ML model to serve prediction requests. We will deploy the Deep Learning model. A prediction deployment must be configured to deploy a model. Deployments are configured via the `configure prediction` command. This command will prompt for various settings.

The relevant information for this command is shown below. Enter this information in the prompts:
    
            Prediction: New                  Prediction Name: trip_dl_deploy_3        Model Name: trip_fare_3_dl_model       Model Version: 1
            Source Type: http
           
Click the `Save Configuration` button to save the Machine Learning deployment.

In [None]:
configure prediction

### Deploy the model

Use the `start prediction` command to run the deployment. The `status prediction` command will show the status of the model serving. The url shown in the output is the endpoint to which REST prediction request may be sent via `curl` or some other means.

In [None]:
start prediction trip_dl_deploy_3

In [None]:
status prediction trip_dl_deploy_3

## Predict trip fare amounts

Use the `test prediction` command to send prediction requests to the deployed model. The command by default uses the last 10 rows from the training dataset and sends those rows in curl prediction requests to the deployed model. The predictions responses are collected and displayed.

Note: when you run the start prediction command, a prediction job starts running which deploys the model. You can use the URL in the status prediction to send curl requests to the deployed model. The `test prediction` command outputs an "Example Curl Request". Use this Curl request example to send data to the deployed model or integrate the curl request logic into applications which can send prediction requests and interpret prediction responses.

In [None]:
test prediction trip_dl_deploy_3

## Building Input Features for Predictions

<html><img src="../../images/trip_fare_images/3_8.png"/></html>

When an application sends a prediction request, the basis input features are present in the prediction request. Any contextual features are fetched from data sinks and appended to the basis features before calling the model for a prediciton. The labels or output features are returned in the prediction response.

The cell below is a Markdown cell showing how to run a Curl Request to fetch predictions. Convert the cell into the Code state, then replace the prediction URL in the text below and execute the cell to get a prediction response.

!curl -X POST ">enter the prediction URL here<" -H "Content-Type: application/json" -d '[{"rest_request_id": "prediction_test-1", "pickup_datetime": "2022-11-12 11:29:05", "pickup_zipcode": "10069", "dropoff_zipcode": "10107", "passenger_count": 3}]'

### Stop the deployed model

Use the `stop prediction` command to stop ML model serving when you have completed the prediction requests. This step is optional, you may choose to leave the model deployed.

In [None]:
stop prediction trip_dl_deploy_3