Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

# Linear Regression for NYC Taxi Datasets with AzureML Pipelines 

In this notebook, you learn how to prepare data for regression modeling. You run various transformations to filter and combine two different NYC taxi datasets
using [Azure Machine Learning Pipelines](https://aka.ms/aml-pipelines). After data process steps, you add a final custom training step to train a regression model.  The trained model then registered to AzureML workspace.

## Prerequisites

*    [ A Kubernetes cluster deployed on Azure Stack Hub, connected to Azure through ARC](https://github.com/Azure/AML-Kubernetes/blob/master/docs/ASH/AML-ARC-Compute.md).
     

*     [Datastore setup in Azure Machine Learning workspace backed up by Azure Stack Hub storage account](https://github.com/Azure/AML-Kubernetes/blob/master/docs/ASH/Train-AzureArc.md) 


*      Last but not least, you need to be able to run a Notebook. 

   If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the configuration Notebook located at [here](https://github.com/Azure/MachineLearningNotebooks) first. This sets you up with a working config file that has information on your workspace, subscription id, etc.

## Initialize AzureML workspace

Initialize a [Workspace](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#workspace) object from the existing workspace you created in the Prerequisites step. `Workspace.from_config()` creates a workspace object from the details stored in `config.json`. 

If you haven't done already please go to `config.json` file and fill in your workspace information.

In [None]:
from azureml.core.workspace import Workspace,  ComputeTarget
from azureml.exceptions import ComputeTargetException

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep='\n')

## Download data for regression modeling

First, you will prepare data for regression modeling. you will leverage the convenience of Azure Open Datasets. Perform `pip install azureml-opendatasets` to get the open dataset package.  The Open Datasets package contains a class representing each data source (NycTlcGreen and NycTlcYellow) to easily filter date parameters before downloading.


### Fetch data
Begin by creating a dataframe to hold the taxi data. When working in a non-Spark environment, Open Datasets only allows downloading one month of data at a time with certain classes to avoid MemoryError with large datasets. To download a year of taxi data, iteratively fetch one month at a time, and before appending it to green_df_raw, randomly sample 500 records from each month to avoid bloating the dataframe. Then preview the data. To keep this process short, we are sampling data of only 1 month.


In [None]:
from azureml.opendatasets import NycTlcGreen, NycTlcYellow
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta

green_df_raw = pd.DataFrame([])
start = datetime.strptime("1/1/2016","%m/%d/%Y")
end = datetime.strptime("1/31/2016","%m/%d/%Y")

number_of_months = 1
sample_size = 5000

for sample_month in range(number_of_months):
    temp_df_green = NycTlcGreen(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \
        .to_pandas_dataframe()
    green_df_raw = green_df_raw.append(temp_df_green.sample(sample_size))

In [None]:
yellow_df_raw = pd.DataFrame([])
start = datetime.strptime("1/1/2016","%m/%d/%Y")
end = datetime.strptime("1/31/2016","%m/%d/%Y")

sample_size = 500

for sample_month in range(number_of_months):
    temp_df_yellow = NycTlcYellow(start + relativedelta(months=sample_month), end + relativedelta(months=sample_month)) \
        .to_pandas_dataframe()
    yellow_df_raw = yellow_df_raw.append(temp_df_yellow.sample(sample_size))

### See the data

In [None]:
from IPython.display import display

display(green_df_raw.head(5))
display(yellow_df_raw.head(5))

### Save data locally

In [None]:
import os
dataDir = "data"

if not os.path.exists(dataDir):
    os.mkdir(dataDir)

greenDir = dataDir + "/green"
yelloDir = dataDir + "/yellow"

if not os.path.exists(greenDir):
    os.mkdir(greenDir)
    
if not os.path.exists(yelloDir):
    os.mkdir(yelloDir)
    
greenTaxiData = greenDir + "/unprepared.csv"
yellowTaxiData = yelloDir + "/unprepared.csv"

green_df_raw.to_csv(greenTaxiData, index=False)
yellow_df_raw.to_csv(yellowTaxiData, index=False)

print("Data written to local folder.")

## Prepare the dataset

Your next step is to upload these files to datastore of the workspace, and then registered as dataset in the workspace. 
  
"datastore_name" is the name of the datastore you setup in [this step](https://github.com/Azure/AML-Kubernetes/blob/master/docs/ASH/Train-AzureArc.md).

Upload and dataset registration take less than 1 min.

In [None]:
from azureml.core import Workspace, Datastore, Dataset

datastore_name = "<NAME_OF_ASH_HOSTED_AML_DATASTORE>"
datastore =  Datastore.get(ws, datastore_name)

dataDir = "data"
greenDir, yelloDir  = dataDir + "/green", dataDir + "/yellow"
greenTaxiData, yellowTaxiData = greenDir + "/unprepared.csv", yelloDir + "/unprepared.csv"

datastore.upload_files([greenTaxiData], 
                           target_path = 'green', 
                           overwrite = True, 
                           show_progress = True)

datastore.upload_files([yellowTaxiData], 
                           target_path = 'yellow', 
                           overwrite = True, 
                           show_progress = True)

print("Upload calls completed.")

# register datasets

dataset_name,  target_path = 'green_taxi_data_o_file', 'green/unprepared.csv'
datastore_paths = [(datastore, target_path)]
green_taxi_data = Dataset.File.from_files(path=datastore_paths)
green_taxi_data.register(ws, dataset_name)

dataset_name,  target_path = 'yellow_taxi_data_o_file', 'yellow/unprepared.csv'
datastore_paths = [(datastore, target_path)]
yellow_taxi_data = Dataset.File.from_files(path=datastore_paths)
yellow_taxi_data.register(ws, dataset_name)

## Setup compute target

Find the attach name for the Arc enabled  Azure Stack Hub kubernetes cluster in your AzureML workspace to create a ComputeTarget:

attach_name is the attached name for your ASH cluster you setup in [this step](https://github.com/Azure/AML-Kubernetes/blob/master/docs/ASH/AML-ARC-Compute.md)

In [None]:
from azureml.core.compute import KubernetesCompute

attach_name = "<NAME_OF_AML_ATTACHED_COMPUTE_OF_YOUR_ASH_CLUSTER>"
arcK_compute = KubernetesCompute(ws, attach_name)

### Define RunConfig for the compute target
We will also use `pandas`, `scikit-learn`,  `pyarrow` for the pipeline steps. Defining the `runconfig` for that.

In [None]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

# Create a new runconfig object
aml_run_config = RunConfiguration()

# Use the arcK_compute you created above. 
aml_run_config.target = arcK_compute

# Enable Docker
aml_run_config.environment.docker.enabled = True

# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
aml_run_config.environment.python.user_managed_dependencies = False

# Specify CondaDependencies obj, add necessary packages
aml_run_config.environment.python.conda_dependencies = CondaDependencies.create(
    conda_packages=['pandas','scikit-learn'], 
    pip_packages=['azureml-sdk', 'pyarrow'])

print ("Run configuration created.")

## Processing data with pipeline steps

The data process includes following pipeline steps implemented as PythonScriptStep:

1. Cleanse Green Taxi Data
2. Cleanse Yellow Taxi Data
3. Merge cleansed Green and Yellow Taxi Data
4. Filter Taxi Data
5. Normalize Taxi Data
6. Transform Taxi Data
7. Train Test Data Split

### Cleanse Taxi Data

Here a set of "useful" columns for both Green and Yellow taxi data are defined:

In [None]:
useful_columns = str(["cost", "distance", "dropoff_datetime", "dropoff_latitude", 
                      "dropoff_longitude", "passengers", "pickup_datetime", 
                      "pickup_latitude", "pickup_longitude", "store_forward", "vendor"]).replace(",", ";")

#### Cleanse Green taxi data

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
import os

# python scripts folder
prepare_data_folder = './scripts/prepdata'

# rename columns as per Azure Machine Learning NYC Taxi tutorial

green_columns = {
        "vendorID": "vendor",
        "lpepPickupDatetime": "pickup_datetime",
        "lpepDropoffDatetime": "dropoff_datetime",
        "storeAndFwdFlag": "store_forward",
        "pickupLongitude": "pickup_longitude",
        "pickupLatitude": "pickup_latitude",
        "dropoffLongitude": "dropoff_longitude",
        "dropoffLatitude": "dropoff_latitude",
        "passengerCount": "passengers",
        "fareAmount": "cost",
        "tripDistance": "distance"
    }

green_columns_key = str(list(green_columns.keys())).replace(",", ";")
green_columns_value = str(list(green_columns.values())).replace(",", ";")
    
# Define output after cleansing step
dest = (datastore, None)
cleansed_green_data = OutputFileDatasetConfig(name="cleansed_green_data", destination=dest).as_upload(overwrite=False)

print('Cleanse script is in {}.'.format(os.path.realpath(prepare_data_folder)))

# cleansing step creation
# See the cleanse.py for details
cleansingStepGreen = PythonScriptStep(
    name="Cleanse Green Taxi Data",
    script_name="cleanse.py", 
    arguments=["--data-path", green_taxi_data.as_named_input('raw_data').as_mount(),
        "--useful_columns", useful_columns,
               "--columns_key", green_columns_key,
                "--columns_value", green_columns_value,
               "--output_cleanse", cleansed_green_data],
    outputs=[cleansed_green_data],
    compute_target=arcK_compute,
    runconfig=aml_run_config,
    source_directory=prepare_data_folder,
    allow_reuse=True
)

print("cleansingStepGreen created.")

#### Cleanse Yellow taxi data

In [None]:
yellow_columns = {
    "vendorID": "vendor",
    "tpepPickupDateTime": "pickup_datetime",
    "tpepDropoffDateTime": "dropoff_datetime",
    "storeAndFwdFlag": "store_forward",
    "startLon": "pickup_longitude",
    "startLat": "pickup_latitude",
    "endLon": "dropoff_longitude",
    "endLat": "dropoff_latitude",
    "passengerCount": "passengers",
    "fareAmount": "cost",
    "tripDistance": "distance"
}

yellow_columns_key = str(list(yellow_columns.keys())).replace(",", ";")
yellow_columns_value = str(list(yellow_columns.values())).replace(",", ";")
    
# Define output after cleansing step
dest = (datastore, None)
cleansed_yellow_data = OutputFileDatasetConfig(name="cleansed_yellow_data", destination=dest).as_upload(overwrite=False)

# cleansing step creation
# See the cleanse.py for details about input and output
cleansingStepYellow = PythonScriptStep(
    name="Cleanse Yellow Taxi Data",
    script_name="cleanse.py", 
    arguments=["--data-path", yellow_taxi_data.as_named_input('raw_data').as_mount(),
        "--useful_columns", useful_columns,
               "--columns_key", yellow_columns_key,
                "--columns_value", yellow_columns_value,
               "--output_cleanse", cleansed_yellow_data],
    compute_target=arcK_compute,
    runconfig=aml_run_config,
    source_directory=prepare_data_folder,
    allow_reuse=True
)

print("cleansingStepYellow created.")

### Merge cleansed Green and Yellow datasets

Create a single data source by merging the cleansed versions of Green and Yellow taxi data.

In [None]:
# Define output after cleansing step
merged_data = OutputFileDatasetConfig(name="merged_data", destination=dest).as_upload(overwrite=False)


print('Merge script is in {}.'.format(os.path.realpath(prepare_data_folder)))

# merging step creation
# See the merge.py for details about input and output
mergingStep = PythonScriptStep(
    name="Merge Taxi Data",
    script_name="merge.py", 
    arguments=["--green_data_path", cleansed_green_data.as_input(),
               "--yellow_data_path", cleansed_yellow_data.as_input(),
        "--output_merge", merged_data],
    compute_target=arcK_compute,
    runconfig=aml_run_config,
    source_directory=prepare_data_folder,
    allow_reuse=True
)

print("mergingStep created.")

### Filter taxi data

This step filters out coordinates for locations that are outside the city border. We use a TypeConverter object to change the latitude and longitude fields to decimal type. 

In [None]:
# Define output after merging step
filtered_data = OutputFileDatasetConfig(name="filtered_data", destination=dest).as_upload(overwrite=False)

print('Filter script is in {}.'.format(os.path.realpath(prepare_data_folder)))

# filter step creation
# See the filter.py for details about input and output
filterStep = PythonScriptStep(
    name="Filter Taxi Data",
    script_name="filter.py", 
    arguments=["--data-path", merged_data.as_input(),
        "--output_filter", filtered_data],
    compute_target=arcK_compute,
    runconfig = aml_run_config,
    source_directory=prepare_data_folder,
    allow_reuse=True
)

print("FilterStep created.")

### Normalize taxi data
In this step, pickup and dropoff datetime values are splitted into the respective date and time columns.  Then rename the columns to meaningful names.

In [None]:
# Define output after normalize step
normalized_data = OutputFileDatasetConfig(name="normalized_data", destination=dest).as_upload(overwrite=False)

print('Normalize script is in {}.'.format(os.path.realpath(prepare_data_folder)))

# normalize step creation
# See the normalize.py for details about input and output
normalizeStep = PythonScriptStep(
    name="Normalize Taxi Data",
    script_name="normalize.py", 
    arguments=["--data-path", filtered_data.as_input(),
        "--output_normalize", normalized_data],
    compute_target=arcK_compute,
    runconfig = aml_run_config,
    source_directory=prepare_data_folder,
    allow_reuse=True
)

print("normalizeStep created.")

### Transform taxi data
Transform the normalized taxi data to final required format. This steps does the following:

- Split the pickup and dropoff date further into the day of the week, day of the month, and month values. 
- To get the day of the week value, uses the derive_column_by_example() function. The function takes an array parameter of example objects that define the input data, and the preferred output. The function automatically determines the preferred transformation. For the pickup and dropoff time columns, split the time into the hour, minute, and second by using the split_column_by_example() function with no example parameter.
- After new features are generated, use the drop_columns() function to delete the original fields as the newly generated features are preferred. 
- Rename the rest of the fields to use meaningful descriptions.

In [None]:
# Define output after transform step
transformed_data = OutputFileDatasetConfig(name="transformed_data", destination=dest).as_upload(overwrite=False)

print('Transform script is in {}.'.format(os.path.realpath(prepare_data_folder)))

# transform step creation
# See the transform.py for details about input and output
transformStep = PythonScriptStep(
    name="Transform Taxi Data",
    script_name="transform.py", 
    arguments=["--data-path", normalized_data.as_input(),
        "--output_transform", transformed_data],
    compute_target=arcK_compute,
    runconfig = aml_run_config,
    source_directory=prepare_data_folder,
    allow_reuse=True
)

print("transformStep created.")

### Split the data into train and test sets

This function segregates the data into dataset for model training and dataset for testing.

In [None]:
train_model_folder = './scripts/trainmodel'

# train and test splits output
output_split_train = OutputFileDatasetConfig(name="output_split_train", destination=dest).as_upload(overwrite=False)
output_split_test = OutputFileDatasetConfig(name="output_split_test", destination=dest).as_upload(overwrite=False)

output_split_test.register_on_complete(name='nyc_taxi_test_set', 
                                       description='test set  from Train Test Data Split')

print('Data spilt script is in {}.'.format(os.path.realpath(train_model_folder)))

# test train split step creation
# See the train_test_split.py for details about input and output
testTrainSplitStep = PythonScriptStep(
    name="Train Test Data Split",
    script_name="train_test_split.py", 
    arguments=["--data-path", transformed_data.as_input(),
        "--output_split_train", output_split_train,
               "--output_split_test", output_split_test],
    compute_target=arcK_compute,
    runconfig = aml_run_config,
    source_directory=train_model_folder,
    allow_reuse=True
)

print("testTrainSplitStep created.")

### Define a custom training step

In [None]:
train_step = PythonScriptStep(
        name="train_step",
        script_name="train_step.py",
        arguments=["--train_data_path", output_split_train.as_input(),
                   "--test_data_path", output_split_test.as_input(),
        ],
      
        compute_target=arcK_compute,
        runconfig=aml_run_config,
        source_directory=train_model_folder,
        allow_reuse=True
    )

print("train_step created.")

## Create an experiement

In [None]:
from azureml.core import Experiment

experiment = Experiment(ws, 'NYCTaxi_Tutorial_Pipelines')

print("Experiment created")

### Build and run the pipeline

In [None]:
from azureml.pipeline.core import Pipeline

pipeline_steps = [train_step]

pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

pipeline_run = experiment.submit(pipeline, regenerate_outputs=False)

print("Pipeline submitted for execution.")

pipeline_run.wait_for_completion()

### Register the model

Register the trained model.

In [None]:
train_step_run = pipeline_run.find_step_run(train_step.name)[0]

registered_model_name='taxi_model'
train_step_run.register_model(model_name=registered_model_name, model_path='outputs/taxi.pkl')

The machine learning model named "taxi_model" should be registered in your AML workspace.

## Test Registered Model

To test the trained model, you can create (or use existing) a AKS cluster for serving the model using AML deployment

In [None]:
from azureml.core import Environment, Workspace, Model, ComputeTarget
from azureml.core.compute import AksCompute
from azureml.core.model import InferenceConfig
from azureml.core.webservice import Webservice, AksWebservice
from azureml.core.compute_target import ComputeTargetException
import numpy as np
import json

### Provision the AKS Cluster

This is a one time setup. You can reuse this cluster for multiple deployments after it has been created. If you delete the cluster or the resource group that contains it, then you would have to recreate it. It may take 5 mins to create a new AKS cluster.

In [None]:
ws = Workspace.from_config()

# Choose a name for your AKS cluster
aks_name = 'aks-service-2'

# Verify that cluster does not exist already
try:
    aks_target = ComputeTarget(workspace=ws, name=aks_name)
    is_new_compute  = False
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # Use the default configuration (can also provide parameters to customize)
    prov_config = AksCompute.provisioning_configuration()

    # Create the cluster
    aks_target = ComputeTarget.create(workspace = ws,
                                    name = aks_name,
                                    provisioning_configuration = prov_config)
    is_new_compute  = True

if aks_target.get_status() != "Succeeded":
    aks_target.wait_for_completion(show_output=True)

### Deploy the model

In [None]:
from azureml.core.conda_dependencies import CondaDependencies
# Enable Docker
env  = Environment("nyc taxi")
env.docker.enabled = True

# Use conda_dependencies.yml to create a conda environment in the Docker image for execution
env.python.user_managed_dependencies = False

# Specify CondaDependencies obj, add necessary packages
env.python.conda_dependencies = CondaDependencies.create(
    conda_packages=['pandas','scikit-learn','numpy==1.19.5'], 
    pip_packages=['azureml-defaults', 'pyarrow'])

inference_config = InferenceConfig(entry_script='score.py', environment=env)
deploy_config = AksWebservice.deploy_configuration()

deployed_model =  registered_model_name 
model = ws.models[deployed_model]

service_name = 'nyctaxiservice1'

service = Model.deploy(workspace=ws,
                       name=service_name,
                       models=[model],
                       inference_config=inference_config,
                       deployment_config=deploy_config,
                       deployment_target=aks_target,
                       overwrite=True)

service.wait_for_deployment(show_output=True)


### Test with inputs

A few intances of test data are stored in test_set.csv for convenience of testing.

In [None]:
import json
import pandas as pd

df_raw = pd.read_csv("test_set.csv")
selected_columns = ['pickup_weekday', 'pickup_hour', 'distance', 'passengers', 'vendor']
df = df_raw[selected_columns][:3]
input_np = df.to_numpy()

instances = json.dumps({"data": input_np.tolist()})

predicted_costs = service.run(instances)

print(predicted_costs)

In [None]:
# Real costs

df_raw["cost"][:3].to_numpy().tolist()

### Delete the newly created cluster

Note: This is important if you wish to avoid the cost of this cluster

In [None]:
if is_new_compute:
    aks_target.delete()

## Next Steps

1. Learn how to [download model then upload to Azure Storage blobs](../AML-model-download-upload.ipynb)
2. Learn how to [inference using KFServing with model in Azure Storage Blobs](https://aka.ms/kfas)