Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/NotebookVM/how-to-use-azureml/training/train-in-spark/train-in-spark.png)

Train in Spark on AML Compute
* Create Workspace
*	Create Experiment
*	Copy relevant files to the script folder
*	Create Environment from AMLSpark Curated Environment
*	Create Datastore and DataRef to mount data onto Spark Cluster
*	Create an AML Run Config with the PySpark Framework
*	Configure and Run Script on AML Compute configured for Spark


## Prerequisites
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, go through the [configuration](../../../configuration.ipynb) Notebook first if you haven't already to establish your connection to the AzureML Workspace.

In [1]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.23.0


## Initialize Workspace

Initialize a workspace object from persisted configuration.

In [2]:
from azureml.core import Workspace
ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep='\n')

zhenzhuuksouth
zhenzhuuksouth
uksouth
e9b2ec51-5c94-4fa8-809a-dc1e695e4896


## Create Experiment


In [3]:
experiment_name = 'data-processing-on-spark'

from azureml.core import Experiment
exp = Experiment(workspace=ws, name=experiment_name)

## View `train-spark.py`

For convenience, we created a training script for you. It is printed below as a text, but you can also run `%pfile ./train-spark.py` in a cell to show the file.

In [34]:
with open('stackoverflow-data-prep.py', 'r') as training_script:
    print(training_script.read())

#!/usr/bin/env python
# coding: utf-8

# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.

import numpy as np
import pyspark
import os
import urllib
import sys
import pandas as pd
import numpy as np
from sklearn.utils import shuffle
import os

from pyspark.sql.functions import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

from azureml.core.run import Run

# initialize logger
run = Run.get_context()

# start Spark session

spark = pyspark.sql.SparkSession.builder.appName('Stackoverflow') \
             .getOrCreate()
           # .config("spark.jars.packages", "com.databricks:spark-xml_2.11:0.6.0") \
           # .config("spark.jars.repositories", "https://mvnrepository.com/artifact/com.databricks/spark-xml") \
            


# print runtime versions
print('****************

## Configure & Run

**Note** You can use Docker-based execution to run the Spark job in local computer or a remote VM. Please see the `train-in-remote-vm` notebook for example on how to configure and run in Docker mode in a VM. Make sure you choose a Docker image that has Spark installed, such as `microsoft/mmlspark:0.12`.

### Attach an AML Compute


In [5]:
from azureml.core import Workspace
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# Choose a name for your CPU cluster
cpu_cluster_name = "spark-data-proc"

# Verify that the cluster does not exist already
try:
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D12_V2',
                                                           max_nodes=4, 
                                                           vm_priority="lowpriority",
                                                           idle_seconds_before_scaledown=2400)
    cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

cpu_cluster.wait_for_completion(show_output=True)

Found existing cluster, use it.
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


### Configure Environment 

Configure AML Curated Environemnt and custom JAR packages with a pip environment that has `scikit-learn`.

In [6]:
from azureml.core.environment import Environment
from azureml.core import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies

spark_env=Environment.get(workspace=ws, name="AzureML-PySpark-MmlSpark-0.15")
spark_env = spark_env.clone("PySpark-MmlSpark-Alt")

# Add 
conda_dep = CondaDependencies()

# Installs scikit-learn pip package
conda_dep.add_pip_package("scikit-learn")

# Adds dependencies to PythonSection of myenv
spark_env.python.conda_dependencies=conda_dep
spark_env.spark.packages = [{"group": "com.databricks","artifact": "spark-xml_2.11","version": "0.6.0"}]
spark_env.spark.repositories = ["https://mvnrepository.com/artifact/com.databricks/spark-xml"]



### Configure AML Cluster 

Set the Framework as PySpark, set cluster name and environment the AML Cluster will use

In [None]:
# use pyspark framework
spark_run_config = RunConfiguration(framework="PySpark")

# Set compute target to the cpu cluster
spark_run_config.target = cpu_cluster.name

# Set environment
spark_run_config.environment = spark_env

## Create Datastore and DataReference for Spark to talk to

TO-DO, write instruction on how to mount to a Blob datastore

In [31]:
from azureml.core import Datastore

stackoverflow_datastore = ws.datastores['stackoverflow_blob']
data_ref = stackoverflow_datastore.as_mount()

### Submit the script to AzureML Compute

In [33]:
from azureml.core import ScriptRunConfig, Environment

script_run_config = ScriptRunConfig(source_directory = '.',
                                    script= 'stackoverflow-data-prep.py',
                                    arguments=[str(data_ref)],
                                    run_config = spark_run_config,
                                    )
script_run_config.run_config.data_references[data_ref.data_reference_name] = data_ref.to_config()
run = exp.submit(config=script_run_config)


Monitor the run using a Juypter widget

In [12]:
from azureml.widgets import RunDetails
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

Note: if you need to cancel a run, you can follow [these instructions](https://aka.ms/aml-docs-cancel-run).

After the run is succesfully finished, you can check the metrics logged.