Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.

# Azure Machine Learning and Synapse integration (Private preview)
### Tutorial: Unified experience of big data prep and taxi fare ML prediction

In this tutorial, you use Apache Spark pools backed by Synapse to explore and transform NYC Green dataset (~1M rows per month, [details](https://azure.microsoft.com/en-us/services/open-datasets/catalog/nyc-taxi-limousine-commission-green-taxi-trip-records/) for the open dataset) and leverage automated machine learning in Azure Machine Learning to create a regression model to predict NYC taxi fare prices in one single notebook. 

In this notebook, you learn the following tasks: 
* Link to Synapse workspace to Azure ML
* Attach Synapse Apache Spark pools to Azure ML
* Launch spark sessions and prepare big data with PySpark  
* Train an automated machine learning regression model on AML compute
* Register and deploy the best model 


**Contents**:
* Prerequisites
* Setup
* Link to Synapse workspace and attach Spark pools
* Run machine learning flow from end to end, including data exploration and preparation, traning and model deployment.


### Prerequisites
1. **Get ready in Azure Machine Learning**: 
    * Create Azure Machine Learing in Azure Portal by following the instructions [here](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-manage-workspace). If you already have Azure ML workspace, skip this step.
    * Initiate Azure ML Compute instance (previously called Notebook VM) in order to run sample notebooks by following the [instructions](https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-instance). If you already have running compute instance, skip this step.


2. **Get ready in Azure Synapse**: 
    * Create a Synapse workspace in Azure Portal by following the instruction [here](https://docs.microsoft.com/azure/synapse-analytics/quickstart-create-workspace). When using Azure Synapse, you only pay for the capabilities you opt in to use. During Synapse public preview, there will not be a cost for provisioning an Azure Synapse workspace. Detailed pricing information can be found in this [site](https://azure.microsoft.com/en-us/pricing/details/synapse-analytics/).
    * If you already have Synapse workspace, create Apache Spark pool using Azure Portal, web tools or Synapse Studio. Intructions can be found [ here](https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-apache-spark-pool-portal). 
  


Note: in private preview, managed system identity is used to submit pipeline/experiment run. However, user credentials are used for notebook execution (interactively launch Spark session in notebook). Please ensure to grant users permission in Synapse if you are trying out cell execution (sample notebook below): in Synapse Studio, go to Manage tab and then Access Control subtab to add user credentials.   
   

### Setup: packages installation
#### Step1: Install Azure ML packages used in this feature

Note: please ignore the version error. In private preivew. you need to install azureml-core<0.1.10 . In public preview, the step will be removed.

In [None]:
pip install -U "azureml-core<0.1.10" --index-url https://azuremlsdktestpypi.azureedge.net/SynapseInAml/ --extra-index-url https://pypi.python.org/simple

In [None]:
!pip install -U azureml-synapse --extra-index-url=https://azuremlsdktestpypi.azureedge.net/SynapseInAml

For JupyterLab, run the additional installation:

In [None]:
!jupyter lab build --minimize=False

#### Step2: Restart the kernel once you complete pip installaction.

### Link to Synapse assets
#### Step1: Attach User Assigned Identity (resource id of UAI can be found in Azure Portal)
To link to Synapse workspace succussfully, grant User Assigned Identity synapse admin role in Synapse Studio 

In [None]:
import datetime  
from azureml.core import Workspace, Experiment, Dataset, Environment,Datastore, LinkedWorkspace

# The Azure ML workspace information to be used when launching spark session 
ws = Workspace.from_config()
ws



#### Step2: Register Synaspe worksapce in Azure ML

In [None]:
linked_workspace = LinkedWorkspace.register(
    workspace = ws,              
    name = '<Synapse workspace alias in Azure ML>',    
    linked_workspace_resource_id = '<Synapse workspace resource ID>', # Synapse workspace resource ID can be found in Synapse Studio


# Optional: use unregister() to delink synapse workspace: linked_workspace.unregister()
# Optional: use ws.linked_workspaces['synapse workspace alais'] to get linked workspace content 

####  View all the linked services
There is a MSI (system_assigned_identity_principal_id) created for each linked service. Make sure you grant spark admin role of the synapse workspace to MSI in synapse studio before you submit job. 

In [None]:
LinkedService.list(ws)

#### Step3: Attach Synapse Apache Spark pools as compute target in Azure ML (one-time set up)
Once attached, you can use Spark pools either in notebook or pipeline/experiment run 

In [None]:
from azureml.core.compute import SynapseCompute, ComputeTarget

attach_config = SynapseCompute.attach_configuration(
        linked_workspace,             #Linked synapse workspace alias
        type="SynapseSpark",          #Type of assets to attach. For private preview, only Apache Spark pools are enabled.
        pool_name="<Synapse Spark pool name>")       #Name of Synapse spark pool 

synapse_compute =ComputeTarget.attach(
        workspace=ws,                
        name='<Synapse pool alias in Azure ML>',         #Alias of attached Synapse Apache Spark pools in Azure ML
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

# Optional: use ws.compute_targets['Spark pool alias'] to get Spark pool 

### Data processing at scale on Spark pool
#### Step1: start Spark session 

Learn more about spark magic package in Azure ML: Use Spark magic to execute commands on spark pools. The reaminig of this notebook run on selected Azure ML compute instance.

In [None]:
# explanation on how to use synapse spark magic syntax
%synapse?

In [None]:
# Specify spark pool name and Azure ML workspace information to launch spark session. 
# You can find AML workspace name, subscription ID and resource group name from ws defined at the beginning of this notebook
%synapse start -c SynapseSparkPoolAlias -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

After session started, you can check the session's metadata.

In [None]:
%synapse meta

#### Step2: data exploration analysis and preparation on Spark pools
The input data in this sample is from Azure open dataset NYC green taxi trip records. The green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.Read [here](https://azure.microsoft.com/services/open-datasets/catalog/nyc-taxi-limousine-commission-green-taxi-trip-records/) for more information.

In [None]:
%%synapse

import numpy as np
import pyspark
import os
import urllib
import sys
from datetime import datetime
from datetime import datetime
from dateutil import parser
from pyspark.sql.functions import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from azureml.core.run import Run

# print runtime versions
print('****************')
print('Python version: {}'.format(sys.version))
print('Spark version: {}'.format(spark.version))
print('****************')

# initialize logger
run = Run.get_context()

# start Spark session
spark = pyspark.sql.SparkSession.builder.appName('NYCGreenTaxi')\
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [None]:
%%synapse

# Retrieve data from Azure ML open dataset
from azureml.opendatasets import NycTlcGreen

end_date = parser.parse('2018-06-01')
start_date = parser.parse('2018-05-01')
nyc_green = NycTlcGreen(start_date=start_date, end_date=end_date)
nyc_green_df = nyc_green.to_spark_dataframe()

# Print schema of input data
print("Schema of the input data:")
nyc_green_df.printSchema()

# Print statistical summary for predicted Y value - total amount for trips
print("Statistics summary for Total Amount:")
nyc_green_df.describe("totalAmount").show()

# View Spark job progress in the table below. You can also view status and logs in Spark UI from table blow.

In [None]:
%%synapse

# Drop columns that are not relavant to ML modeling
columns_to_drop = ['vendorID','pickupLongitude','pickupLatitude','dropoffLongitude','dropoffLatitude','lpepPickupDatetime','lpepDropoffDatetime','puLocationId','doLocationId','rateCodeID','storeAndFwdFlag','paymentType','fareAmount','ehailFee','extra','mtaTax','improvementSurcharge','tipAmount','tollsAmount','puYear','puMonth']
df = nyc_green_df.drop(*columns_to_drop)

# Transform column tripType
df_t = df.withColumn('tripType', when(df.tripType==2,lit('0')).otherwise(df.tripType))

# Create or replace temp view to prepare for pyspark sql
df_t.createOrReplaceTempView("df_temp")

# Run query by leveraging pyspark sql 
sqlDF = spark.sql("""
    SELECT * 
    FROM df_temp 
    WHERE  (tripDistance>=25 and tripDistance<50)
    AND (passengerCount>0 and totalAmount>0)
""")

# Data exploration and transformation is completed. Print processed data sample.
print("Reading for machine learning")
sqlDF.show(10)


In [None]:
%%synapse
# Output process data to storage accounts. Below sqlDF is writted as delta table to ADLS Gen2. 
sqlDF.write.format("delta").save("abfss://containername@storageaccountpath/foldername/")

#you can also use sqlDF.write.parquet("abfss://containername@storageaccountpath/foldername/",mode='overwrite')

#### step3: Stop Spark session
When current session reach the status timeout, dead or any failure, you must explicitly stop it before start new one.

In [None]:
%synapse stop

### Train model on Azure ML compute

#### Retrieve processed data from ADLS gen2 storage (intermediate storage account for processed data)

**Register ADLS Gen 2 as Azure ML datastore**: Skip this step if you already have registered storage accounts in Azure ML. In this tutorial, data output from spark session is stored in the registered ADLS Gen2 storage account.You can retrieve this data and track the lineage by leveraging AML dataset after spark session is ended.

In [None]:
# Upgrade to latest AML SDK packages
pip install --upgrade azureml-sdk

**Attention: please restart kernal once installation completes**

In [None]:
import os

ws = Workspace.from_config()
adlsgen2_datastore_name = '<ADLS gen2 storage account alias>'  #set ADLS Gen2 storage account alias in AML

subscription_id=os.getenv("ADL_SUBSCRIPTION", "<ADLS account subscription ID>") # subscription id of ADLS account
resource_group=os.getenv("ADL_RESOURCE_GROUP", "<ADLS account resource group>") # resource group of ADLS account

account_name=os.getenv("ADLSGEN2_ACCOUNTNAME", "<ADLS account name>") # ADLS Gen2 account name
tenant_id=os.getenv("ADLSGEN2_TENANT", "<tenant id of service principal>") # tenant id of service principal
client_id=os.getenv("ADLSGEN2_CLIENTID", "<client id of service principal>") # client id of service principal
client_secret=os.getenv("ADLSGEN2_CLIENT_SECRET", "<secret of service principal>") # the secret of service principal

adlsgen2_datastore = Datastore.register_azure_data_lake_gen2(
    workspace=ws,
    datastore_name=adlsgen2_datastore_name,
    account_name=account_name, # ADLS Gen2 account name
    filesystem='<filesystem name>', # ADLS Gen2 filesystem
    tenant_id=tenant_id, # tenant id of service principal
    client_id=client_id, # client id of service principal
    client_secret=client_secret) # the secret of service principal

In [None]:
from azureml.core import Workspace, Datastore, Dataset
from azureml.core.experiment import Experiment
from azureml.train.automl import AutoMLConfig

datastore_name = '<ADLS gen2 storage account alias>'
    
# retrieve data via AML datastore
datastore = Datastore.get(ws, datastore_name)
datastore_path = [(datastore, '/data/*.snappy.parquet')]
        
nyc_green = Dataset.Tabular.from_parquet_files(path=datastore_path)

In [None]:
#Register as AML dataset and convert to pandas dataframe
nyc_green_df = nyc_green.register(workspace=ws, name='NYCTaxi_Green_Processed', description='This dataset has been processed and ready for training',create_new_version=True)
final_df = nyc_green_df.to_pandas_dataframe()
final_df.describe()

#### Split the data into train and test sets

In [None]:
# Split the dataset into train and test datasets
train_data, test_data = nyc_green_df.random_split(percentage=0.8, seed=223)
label = "totalAmount"

#### Automatically train a model by using automated machine learning
In this example, autoML is used to identify the best model. You can also build your own ML model.

In [None]:
import logging

automl_settings = {
     "enable_early_stopping": True, 
    "experiment_timeout_hours" : 0.25,
    "max_concurrent_iterations": 4,
    "max_cores_per_iteration": -1,
    "n_cross_validations": 5,
    "primary_metric": 'spearman_correlation',
    "verbosity": logging.INFO
}


automl_config = AutoMLConfig(task='regression',
                             compute_target ='CIdemo',  
                             training_data = train_data,
                             label_column_name = label,
                             **automl_settings)

In [None]:
experiment = Experiment(ws, "taxi-experiment")
remote_run = experiment.submit(automl_config, show_output=True)

#### Explore model results

In [None]:
from azureml.widgets import RunDetails
RunDetails(remote_run).show()

#### Retrieve the best model

In [None]:
best_run, fitted_model = remote_run.get_output()
print(best_run)
print(fitted_model)

### Test ML model

In [None]:
test_data = test_data.to_pandas_dataframe()
y_test = test_data['totalAmount'].fillna(0)
test_data = test_data.drop('totalAmount', 1)
test_data = test_data.fillna(0)


train_data = train_data.to_pandas_dataframe()
y_train = train_data['totalAmount'].fillna(0)
train_data = train_data.drop('totalAmount', 1)
train_data = train_data.fillna(0)


In [None]:
y_pred_train = fitted_model.predict(train_data)
y_residual_train = y_train - y_pred_train

y_pred_test = fitted_model.predict(test_data)
y_residual_test = y_test - y_pred_test

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

from sklearn.metrics import mean_squared_error, r2_score

# Set up a multi-plot chart.
f, (a0, a1) = plt.subplots(1, 2, gridspec_kw = {'width_ratios':[1, 1], 'wspace':0, 'hspace': 0})
f.suptitle('Regression Residual Values', fontsize = 18)
f.set_figheight(6)
f.set_figwidth(16)

# Plot residual values of training set.
a0.axis([0, 360, -100, 100])
a0.plot(y_residual_train, 'bo', alpha = 0.5)
a0.plot([-10,360],[0,0], 'r-', lw = 3)
a0.text(16,170,'RMSE = {0:.2f}'.format(np.sqrt(mean_squared_error(y_train, y_pred_train))), fontsize = 12)
a0.text(16,140,'R2 score = {0:.2f}'.format(r2_score(y_train, y_pred_train)),fontsize = 12)
a0.set_xlabel('Training samples', fontsize = 12)
a0.set_ylabel('Residual Values', fontsize = 12)

# Plot residual values of test set.
a1.axis([0, 90, -100, 100])
a1.plot(y_residual_test, 'bo', alpha = 0.5)
a1.plot([-10,360],[0,0], 'r-', lw = 3)
a1.text(5,170,'RMSE = {0:.2f}'.format(np.sqrt(mean_squared_error(y_test, y_pred_test))), fontsize = 12)
a1.text(5,140,'R2 score = {0:.2f}'.format(r2_score(y_test, y_pred_test)),fontsize = 12)
a1.set_xlabel('Test samples', fontsize = 12)
a1.set_yticklabels([])

plt.show()

In [None]:
%matplotlib inline
test_pred = plt.scatter(y_test, y_pred_test, color='')
test_test = plt.scatter(y_test, y_test, color='g')
plt.legend((test_pred, test_test), ('prediction', 'truth'), loc='upper left', fontsize=8)
plt.show()

### Register best model

In [None]:
description = 'My AutoML Model'

model = best_run.register_model(description = description)

print(best_run.model_id)