# Run a Spark Job on AzureML

This notebook provides an example of how to define and run a job on AzureML using Spark. This notebook is the _control plane_, meaning it creates a connection to the AzureML workspace, defines the job, and submits the job.

**This Jupyter notebook should be run from within a compute instance on AzureML, in a Python kernel, specifically `Python 3.10 - SDK v2 (Python 3.10.11)`**. 

As you can see from the files contained in this `job` subdirectory, there are several files:

- A parametrized Python script with pyspark code that is submitted to a Spark cluster



## Create a client connection to the AzureML workspace

The following cell creates a connection object called `azureml_client` which has a connection to the AzureML workspace.

In [None]:
from azure.ai.ml import MLClient, spark, Input, Output
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import UserIdentityConfiguration

In [None]:
## Use this when running the control plane from the AzureML Compute Instance

azureml_client = MLClient.from_config(
    DefaultAzureCredential(),
)

In [None]:
## Use this when running the control plane from your laptop
ml_client = MLClient(
    credential=DefaultAzureCredential(),
    workspace_name="prof-azureml",
    subscription_id="21ff0fc0-dd2c-450d-93b7-96eeb3699b22",
    resource_group_name="prof-azureml"
)

## Define the Job

The following cell defines the job. It is an object of [Spark Class](https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.spark?view=azure-python) that contains the required information to run a job:

- The cluster size
- The script to run
- The parameters for the script

In the example below, we are using the `pyspark-script-job.py` script which is parametrized. As you can see, the parameters are the following:

- `input_object_store_base_url` (**don't forget the trailing slashes /**): 
    - Here you will use a base URL of the `s3://<BUCKETNAME>/` form for Sagemaker, 
    - or `wasbs://<CONTAINER-NAME>@<STORAGE-ACCOUNT>.blob.core.windows.net/` 
    - or `azureml://datastores/workspaceblobstore/paths/` for AzureML. **Don't forget the trailing slash /.**
- `input_path`: The path to read from
- `output_object_store_base_url`: 
- `output_path`: The path to write to
- `subreddits`: a comma separated string of subreddit names

The PySpark script accepts the object store location for the raw data, in this case a single month. Then the job filters the original data and writes the filtered data out. This is designed to be used for either submissions or comments, not both.

For more information about the parameters used in the job definition, [read the documentation](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-submit-spark-jobs?view=azureml-api-2&tabs=sdk#submit-a-standalone-spark-job).



In [None]:
# Download the spark-nlp jar and save it locally. This needs to be done before submitting a job.
import requests
response = requests.get("https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.5.1.jar")
with open("spark-nlp-assembly-5.5.1.jar", "wb") as f:
    f.write(response.content)

In [None]:
nlp_job_def = spark(
    display_name="test-sparknlp-job",
    code="./",
    entry={"file": "test-job-sparknlp.py"},
    driver_cores=1,
    driver_memory="7g",
    executor_cores=4,
    executor_memory="7g",
    executor_instances=1,
    resources={
        "instance_type": "Standard_E4S_V3",
        "runtime_version": "3.4",
    },
    environment="azureml://environments/sparknlp-env/versions/2",
    identity=UserIdentityConfiguration()
)



In [None]:
nlp_job_def = spark(
    display_name="test-sparknlp-job",
    code="./",
    entry={"file": "test-job-sparknlp.py"},
    driver_cores=1,
    driver_memory="7g",
    executor_cores=4,
    executor_memory="7g",
    executor_instances=1,
    resources={
        "instance_type": "Standard_E4S_V3",
        "runtime_version": "3.4",
    },
    environment="azureml:sparknlp-env@latest",
    identity=UserIdentityConfiguration()
)


## Submit the job

The following cell takes the job you defined above and submits it. If you are submitting multiple jobs, you may want to create separate job definition objects for clarity. You can submit more than one job, just remember that each job will spin up a Spark cluster.

In [13]:
test_job_base_obj = ml_client.jobs.create_or_update(test_job_base)

[32mUploading azure-project-tutorial (0.56 MBs): 100%|██████████| 563469/563469 [00:00<00:00, 1164863.01it/s]
[39m



In [17]:
sparknlp_job = ml_client.jobs.create_or_update(nlp_job_def)

MlException: 
[37m
[30m
1) One or more fields are invalid[39m[39m

Details: 

[31m(x) Could not parse azureml://sparknlp-env@latest. If providing an ARM id, it should start with a '/'.[39m

Resolutions: 
1) Double-check that all specified parameters are of the correct types and formats prescribed by the ArmResource schema.
If using the CLI, you can also check the full log in debug mode for more details by adding --debug to the end of your command

Additional Resources: The easiest way to author a yaml specification file is using IntelliSense and auto-completion Azure ML VS code extension provides: [36mhttps://code.visualstudio.com/docs/datascience/azure-machine-learning.[39m To set up VS Code, visit [36mhttps://docs.microsoft.com/azure/machine-learning/how-to-setup-vs-code[39m


In [4]:
job2_object = azureml_client.jobs.create_or_update(job2_submissions)

[32mUploading project-tutorials (0.51 MBs): 100%|██████████| 514798/514798 [00:00<00:00, 1398860.37it/s]
[39m



## Get the Job Studio URL

Once you submit the job, you can navigate to it in the AzureML Studio and monitor it's progress. There are ways to do it through the SDK but for now just use the Studio. These are unattended jobs, which means you can shut down this notebook and the Compute Instance, but the job will go through it's lifecycle:

- Job is submitted
- Job is queued
- Job is run
- Job completes (assuming no errors)

**Each job's Studio URL will be different.**

In [5]:
job1_url = job1_object.studio_url
print(job1_url)

https://ml.azure.com/runs/cyan_lunch_frrhmxy3j0?wsid=/subscriptions/21ff0fc0-dd2c-450d-93b7-96eeb3699b22/resourcegroups/prof-azureml/workspaces/prof-azureml&tid=fd571193-38cb-437b-bb55-60f28d67b643


In [11]:
job2_url = job1_object.studio_url
print(job2_url)

https://ml.azure.com/runs/cool_apple_77y7w93jt5?wsid=/subscriptions/21ff0fc0-dd2c-450d-93b7-96eeb3699b22/resourcegroups/prof-azureml/workspaces/prof-azureml&tid=fd571193-38cb-437b-bb55-60f28d67b643
