Copyright (c) Microsoft Corporation. All rights reserved. 

Licensed under the MIT License.

# Using Synapse Spark Pool as a Compute Target from Azure Machine Learning Remote Run
1. To use Synapse Spark Pool as a compute target from Experiment Run, [ScriptRunConfig](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.script_run_config.scriptrunconfig?view=azure-ml-py) is used, the same as other Experiment Runs. This notebook demonstrates how to leverage ScriptRunConfig to submit an experiment run to an attached Synapse Spark cluster.
2. To use Synapse Spark Pool as a compute target from [Azure Machine Learning Pipeline](https://aka.ms/pl-concept), a [SynapseSparkStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.synapse_spark_step.synapsesparkstep?view=azure-ml-py) is used. This notebook demonstrates how to leverage SynapseSparkStep in Azure Machine Learning Pipeline.

## Before you begin:
1. **Create an Azure Synapse workspace**, check [this] (https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-workspace) for more information.
2. **Create Spark Pool in Synapse workspace**: check [this] (https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-apache-spark-pool-portal) for more information.

# Azure Machine Learning and Pipeline SDK-specific imports

In [None]:
import os
import azureml.core
from azureml.core import Workspace, Experiment
from azureml.core import LinkedService, SynapseWorkspaceLinkedServiceConfiguration
from azureml.core.compute import ComputeTarget, AmlCompute, SynapseCompute
from azureml.exceptions import ComputeTargetException
from azureml.data import HDFSOutputDatasetConfig
from azureml.core.datastore import Datastore
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep, SynapseSparkStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

In [None]:
ws = Workspace.from_config()

# Link Synapse workspace to AML 
You have to be an "Owner" of Synapse workspace resource to perform linking. You can check your role in the Azure resource management portal, if you don't have an "Owner" role, you can contact an "Owner" to link the workspaces for you.

In [None]:
# Replace with your resource info before running.

synapse_subscription_id=os.getenv("SYNAPSE_SUBSCRIPTION_ID", ".....")
synapse_resource_group=os.getenv("SYNAPSE_RESOURCE_GROUP", ".....")
synapse_workspace_name=os.getenv("SYNAPSE_WORKSPACE_NAME", ".....")
synapse_linked_service_name=os.getenv("SYNAPSE_LINKED_SERVICE_NAME", ".....")

synapse_link_config = SynapseWorkspaceLinkedServiceConfiguration(
    subscription_id=synapse_subscription_id,
    resource_group=synapse_resource_group,
    name=synapse_workspace_name
)

linked_service = LinkedService.register(
    workspace=ws,
    name=synapse_linked_service_name,
    linked_service_config=synapse_link_config)

# Linked service property

A MSI (system_assigned_identity_principal_id) will be generated for each linked service, for example:

#### Make sure you grant "Synapse Apache Spark Administrator" role of the synapse workspace to the generated workspace linking MSI in Synapse studio portal before you submit job.

In [None]:
linked_service

# Attach Synapse spark pool as AML compute target

In [None]:
synapse_spark_pool_name=os.getenv("SYNAPSE_SPARK_POOL_NAME", ".....")
synapse_compute_name=os.getenv("SYNAPSE_COMPUTE_NAME", ".....")

attach_config = SynapseCompute.attach_configuration(
        linked_service,
        type="SynapseSpark",
        pool_name=synapse_spark_pool_name)

synapse_compute=ComputeTarget.attach(
        workspace=ws,
        name=synapse_compute_name,
        attach_configuration=attach_config)

synapse_compute.wait_for_completion()

# Start an experiment run

## Prepare data

In [None]:
# Use the default blob storage
def_blob_store = Datastore(ws, "workspaceblobstore")
print('Datastore {} will be used'.format(def_blob_store.name))

# We are uploading a sample file in the local directory to be used as a datasource
file_name = "Titanic.csv"
def_blob_store.upload_files(files=["./{}".format(file_name)], overwrite=False)


## File dataset as input

In [None]:
from azureml.core import Dataset
titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])
input = titanic_file_dataset.as_named_input("file_input").as_hdfs()

## Output config: the output will be registered as a File dataset



In [None]:
from azureml.data import HDFSOutputDatasetConfig
output = HDFSOutputDatasetConfig(destination=(def_blob_store,"prepared-dataset")).register_on_complete(name="titanic-final")

## Dataprep script

In [None]:
import pandas as pd
df = pd.read_csv('./Titanic.csv')
df.head()

In [None]:
os.makedirs("code", exist_ok=True)

In [None]:
%%writefile code/dataprep.py
import os
import sys
import azureml.core
from azureml.core import Run, Dataset
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import  *
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


print(azureml.core.VERSION)
print(os.environ)

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--file_input")
parser.add_argument("--output_dir")
args = parser.parse_args()

# use dataset sdk to read tabular dataset
#run_context = Run.get_context()
#dataset = Dataset.get_by_id(run_context.experiment.workspace,id=args.tabular_input)
#sdf = dataset.to_spark_dataframe()
#sdf.show(5)


# use hdfs path to read file dataset
spark= SparkSession.builder.getOrCreate()

sdf = spark.read.option("header", "true").csv(args.file_input)
print("Raw df:")
sdf.show(5)

# check data schema
sdf.printSchema()

# statistical summary
sdf.describe().show()

# Preprocessing
## 1- create new column with title
sdf = sdf.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))

titles_map = {
 'Capt' : 'Rare',
 'Col' : 'Rare',
 'Don': 'Rare',
 'Dona': 'Rare',
 'Dr' : 'Rare',
 'Jonkheer' :'Rare' ,
 'Lady': 'Rare',
 'Major': 'Rare',
 'Master': 'Master',
 'Miss' : 'Miss',
 'Mlle' : 'Rare',
 'Mme': 'Rare',
 'Mr': 'Mr',
 'Mrs': 'Mrs',
 'Ms': 'Rare',
 'Rev': 'Rare',
 'Sir': 'Rare',
 'Countess': 'Rare'
}
def impute_title(title):
    return titles_map[title]
title_map_func = F.udf(lambda x: impute_title(x), StringType())
sdf = sdf.withColumn('Title', title_map_func('Title'))

## 2- Fill missing values with mean/mode
print("Filling missing values")
sdf = sdf.withColumn("Age", F.col("Age").cast(IntegerType()))
age_mean = round(sdf.select(F.mean("Age")).collect()[0][0])
print("age_mean: ", age_mean)
sdf = sdf.fillna(age_mean, subset=['Age'])

embarked_mode = sdf.groupby("Embarked").count().orderBy("Embarked", ascending=False).first()[0]
sdf = sdf.fillna(embarked_mode, subset=['Embarked'])

## 3- Replace cabin to 1st char from the string
print("Modifying cabin column")
sdf = sdf.withColumn("Cabin", sdf.Cabin.substr(0, 1))
### fill null values with U - undefined
sdf = sdf.fillna('U', subset=['Cabin'])

## 4- create family size column from Parch and SibSp 
print("Creating family column")
sdf = sdf.withColumn('Family_size', F.col('Parch')+ F.col('SibSp'))

## 5- transform string columns to int values
print("Indexing strings")
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(sdf) for column in ["Sex","Embarked","Title", "Cabin"]]
pipeline = Pipeline(stages=indexers)
sdf = pipeline.fit(sdf).transform(sdf)

## 6- Drop unecessary columns
sdf = sdf.drop('Sex','PassengerId','Name','Title', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked')

print("Preprocessed df:")
sdf.show(5)
sdf.printSchema()
sdf.describe().show()

# Colaesce results and write back to csv format
sdf.coalesce(1).write\
.option("header", "true")\
.mode("append")\
.csv(args.output_dir)

## Set up Conda dependency for the following Script Run

In [None]:
from azureml.core.environment import CondaDependencies
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

## How to leverage ScriptRunConfig to submit an experiment run to an attached Synapse Spark cluster

In [None]:
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "32g" 
run_config.spark.configuration["spark.driver.cores"] = 2
run_config.spark.configuration["spark.executor.memory"] = "32g" 
run_config.spark.configuration["spark.executor.cores"] = 2
run_config.spark.configuration["spark.executor.instances"] = 2 

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input,
                                                 "--output_dir", output],
                                    run_config = run_config) 

In [None]:
from azureml.core import Experiment 
exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

## How to leverage SynapseSparkStep in an AML pipeline to orchestrate data prep step on Synapse Spark and training step on AzureML compute.

In [None]:
# Choose a name for your CPU cluster
cpu_cluster_name = "cpucluster"

# Verify that cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS3_v2',
                                                           min_nodes=0,
                                                           max_nodes=3)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

In [None]:
%%writefile code/train_2.py
import glob
import os
import sys
from os import listdir
from os.path import isfile, join
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from azureml.core import Run
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

mypath = os.environ["step2_input"]
csv_files = [f for f in listdir("mypath") if isfile(join("mypath", f)) and f.startswith('part-00000')]

# start an Azure ML run
run = Run.get_context()

df = pd.DataFrame()
#append all files together
for file in csv_files:
            df_temp = pd.read_csv(file)
            df = df.append(df_temp, ignore_index=True)
            
df=df.dropna()
df.head()

X = df.drop(['Survived'],axis=1)
y = df.Survived
# split train test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print(X_train.shape, X_test.shape)
print(y_train.shape, y_test.shape)

# train the model
clf = RandomForestClassifier()
clf.fit(X_train, y_train)

# Get feature importance
feature_imp = pd.Series(clf.feature_importances_, index=X.columns).sort_values(ascending=False)
plt.figure(figsize=(10,6))
sns.barplot(x=feature_imp, y=feature_imp.index)
# Add labels to your graph
plt.xlabel('Feature Importance Score')
plt.ylabel('Features')
plt.title("Visualizing Important Features")
plt.tight_layout()
run.log_image(name='feature_importance.png', plot=plt)

# Get accuracy + Confusion matrix
y_pred = clf.predict(X_test)
accuracy = metrics.accuracy_score(y_test, y_pred)
print("Accuracy: {}".format(accuracy))
run.log('Accuracy', np.float(accuracy))

print(classification_report(y_test,y_pred))
conf_matrix = confusion_matrix(y_test, y_pred)

plt.figure(figsize=(8,5))
sns.heatmap(conf_matrix, annot=True)
plt.title('Confusion Matrix')
plt.tight_layout()
run.log_image(name='confusion_matrix.png', plot=plt)


In [None]:
%%writefile code/train.py
import glob
import os
import sys
from os import listdir
from os.path import isfile, join

mypath = os.environ["step2_input"]
files = [f for f in listdir(mypath) if isfile(join(mypath, f))]
for file in files:
    with open(join(mypath,file)) as f:
        print(f.read())


In [None]:
titanic_tabular_dataset = Dataset.Tabular.from_delimited_files(path=[(def_blob_store, file_name)])
titanic_file_dataset = Dataset.File.from_files(path=[(def_blob_store, file_name)])

step1_input = titanic_file_dataset.as_named_input("file_input").as_hdfs()
step1_output = HDFSOutputDatasetConfig(destination=(def_blob_store,"prepared-dataset")).register_on_complete(name="titanic-pipeline-prepared")
step2_input = step1_output.as_input("step2_input").as_download()

from azureml.core.environment import Environment
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")


step_1 = SynapseSparkStep(name = 'synapse-spark',
                          file = 'dataprep.py',
                          source_directory="./code", 
                          inputs=[step1_input],
                          outputs=[step1_output],
                          arguments = ["--file_input", step1_input,
                                       "--output_dir", step1_output],
                          compute_target = synapse_compute_name,
                          driver_memory = "32g",
                          driver_cores = 2,
                          executor_memory = "32g",
                          executor_cores = 1,
                          num_executors = 2,
                          environment = env,
                          allow_reuse=True)

step_2 = PythonScriptStep(script_name="train.py",
                          arguments=[step2_input],
                          inputs=[step2_input],
                          compute_target=cpu_cluster_name,
                          source_directory="./code",
                          allow_reuse=True);

pipeline = Pipeline(workspace=ws, steps=[step_1, step_2])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=False)