## Submit pipeline jobs with Spark component (preview)
Please see documentation page: [Submit Spark jobs in Azure Machine Learning](https://learn.microsoft.com/azure/machine-learning/how-to-submit-spark-jobs#spark-component-in-a-pipeline-job) for more details related to the code samples in this notebook.

You can submit a Spark job from:
- an Azure Machine Learning Notebook connected to an Azure Machine Learning compute instance. 
- [Visual Studio Code connected to an Azure Machine Learning compute instance](https://learn.microsoft.com/azure/machine-learning/how-to-set-up-vs-code-remote?tabs=studio).
- your local computer that has [the Azure Machine Learning SDK for Python](https://learn.microsoft.com/python/api/overview/azure/ai-ml-readme) installed.

## Use an attached Synapse Spark pool

You should have an attached Synapse Spark pool available in your workspace. Please see documentation page: [Attach and manage a Synapse Spark pool in Azure Machine Learning](https://learn.microsoft.com/azure/machine-learning/how-to-manage-synapse-spark-pool) for more details.

**Note** - To ensure successful execution of spark job, the identity being used for the Spark job should be assigned **Contributor** and **Storage Blob Data Contributor** roles on the Azure storage account used for data input and output.

### Submit a pipeline job using managed identity
For an attached Synpase Spark pool, managed identity is compute identity of the attached Synapse Spark pool.

In [None]:
from azure.ai.ml import MLClient, dsl, spark, Input, Output
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import ManagedIdentityConfiguration
from azure.ai.ml.constants import InputOutputModes

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

spark_component = spark(
    name="spark_component_1",
    inputs={
        "titanic_data": Input(type="uri_file", mode="direct"),
    },
    outputs={
        "wrangled_data": Output(type="uri_folder", mode="direct"),
    },
    # The source folder of the component
    code="./src",
    entry={"file": "titanic.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=2,
    executor_memory="2g",
    executor_instances=2,
    args="--titanic_data ${{inputs.titanic_data}} --wrangled_data ${{outputs.wrangled_data}}",
)


@dsl.pipeline(
    description="Sample Pipeline with Spark component",
)
def spark_pipeline(spark_input_data):
    spark_step = spark_component(titanic_data=spark_input_data)
    spark_step.inputs.titanic_data.mode = InputOutputModes.DIRECT
    spark_step.outputs.wrangled_data = Output(
        type="uri_folder",
        path="azureml://datastores/workspaceblobstore/paths/data/wrangled/",
    )
    spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT
    spark_step.identity = ManagedIdentityConfiguration()
    spark_step.compute = "<ATTACHED_SPARK_POOL_NAME>"


pipeline = spark_pipeline(
    spark_input_data=Input(
        type="uri_file",
        path="azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
    )
)

pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="Titanic-Spark-Pipeline-SDK-1",
)

# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

### Submit a pipeline job using user identity

In [None]:
from azure.ai.ml import MLClient, dsl, spark, Input, Output
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import UserIdentityConfiguration
from azure.ai.ml.constants import InputOutputModes

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

spark_component = spark(
    name="spark_component_2",
    inputs={
        "titanic_data": Input(type="uri_file", mode="direct"),
    },
    outputs={
        "wrangled_data": Output(type="uri_folder", mode="direct"),
    },
    # The source folder of the component
    code="./src",
    entry={"file": "titanic.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=2,
    executor_memory="2g",
    executor_instances=2,
    args="--titanic_data ${{inputs.titanic_data}} --wrangled_data ${{outputs.wrangled_data}}",
)


@dsl.pipeline(
    description="Sample Pipeline with Spark component",
)
def spark_pipeline(spark_input_data):
    spark_step = spark_component(titanic_data=spark_input_data)
    spark_step.inputs.titanic_data.mode = InputOutputModes.DIRECT
    spark_step.outputs.wrangled_data = Output(
        type="uri_folder",
        path="azureml://datastores/workspaceblobstore/paths/data/wrangled/",
    )
    spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT
    spark_step.identity = UserIdentityConfiguration()
    spark_step.compute = "<ATTACHED_SPARK_POOL_NAME_UAI>"


pipeline = spark_pipeline(
    spark_input_data=Input(
        type="uri_file",
        path="azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
    )
)

pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="Titanic-Spark-Pipeline-SDK-2",
)

# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

### Submit a pipeline job using default identity
Default identity for an attached Synpase Spark pool is managed identity (compute identity of the attached Synapse Spark pool).

In [None]:
from azure.ai.ml import MLClient, dsl, spark, Input, Output
from azure.identity import DefaultAzureCredential
from azure.ai.ml.constants import InputOutputModes

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

spark_component = spark(
    name="spark_component_3",
    inputs={
        "titanic_data": Input(type="uri_file", mode="direct"),
    },
    outputs={
        "wrangled_data": Output(type="uri_folder", mode="direct"),
    },
    # The source folder of the component
    code="./src",
    entry={"file": "titanic.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=2,
    executor_memory="2g",
    executor_instances=2,
    args="--titanic_data ${{inputs.titanic_data}} --wrangled_data ${{outputs.wrangled_data}}",
)


@dsl.pipeline(description="Sample Pipeline with Spark component")
def spark_pipeline(spark_input_data):
    spark_step = spark_component(titanic_data=spark_input_data)
    spark_step.inputs.titanic_data.mode = InputOutputModes.DIRECT
    spark_step.outputs.wrangled_data = Output(
        type="uri_folder",
        path="azureml://datastores/workspaceblobstore/paths/data/wrangled/",
    )
    spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT
    spark_step.compute = "<ATTACHED_SPARK_POOL_NAME>"


pipeline = spark_pipeline(
    spark_input_data=Input(
        type="uri_file",
        path="azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
    ),
)

pipeline_job = ml_client.jobs.create_or_update(
    pipeline, experiment_name="Titanic-Spark-Pipeline-SDK-3"
)

# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

## Use a serverless Spark compute

A serverless Spark compute is available in your workspace by default. Please see [this documentation page](https://learn.microsoft.com/azure/machine-learning/interactive-data-wrangling-with-apache-spark-azure-ml##serverless-spark-compute-in-azure-machine-learning-notebooks) for more information about serverless Spark compute.

**Note** - To ensure successful execution of pipeline job with Spark component, the identity being used for the job should be assigned **Contributor** and **Storage Blob Data Contributor** roles on the Azure storage account used for data input and output.

### Submit a pipeline job using managed identity
For a serverless Spark compute, managed identity is the user-assigned managed identity attached to the Azure Machine Learning workspace. Please see [this documentation page to learn how to attach the user assigned managed identity](https://learn.microsoft.com/azure/machine-learning/how-to-submit-spark-jobs#attach-user-assigned-managed-identity-using-cli-v2) to the Azure Machine Learning workspace.

In [None]:
from azure.ai.ml import MLClient, dsl, spark, Input, Output
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import ManagedIdentityConfiguration
from azure.ai.ml.constants import InputOutputModes

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

spark_component = spark(
    name="spark_component_4",
    inputs={
        "titanic_data": Input(type="uri_file", mode="direct"),
    },
    outputs={
        "wrangled_data": Output(type="uri_folder", mode="direct"),
    },
    # The source folder of the component
    code="./src",
    entry={"file": "titanic.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=2,
    executor_memory="2g",
    executor_instances=2,
    args="--titanic_data ${{inputs.titanic_data}} --wrangled_data ${{outputs.wrangled_data}}",
)


@dsl.pipeline(
    description="Sample Pipeline with Spark component",
)
def spark_pipeline(spark_input_data):
    spark_step = spark_component(titanic_data=spark_input_data)
    spark_step.inputs.titanic_data.mode = InputOutputModes.DIRECT
    spark_step.outputs.wrangled_data = Output(
        type="uri_folder",
        path="azureml://datastores/workspaceblobstore/paths/data/wrangled/",
    )
    spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT
    spark_step.identity = ManagedIdentityConfiguration()
    spark_step.resources = {
        "instance_type": "Standard_E8S_V3",
        "runtime_version": "3.2.0",
    }


pipeline = spark_pipeline(
    spark_input_data=Input(
        type="uri_file",
        path="azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
    )
)

pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="Titanic-Spark-Pipeline-SDK-4",
)

# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

### Submit a pipeline job using user identity

In [None]:
from azure.ai.ml import MLClient, dsl, spark, Input, Output
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import UserIdentityConfiguration
from azure.ai.ml.constants import InputOutputModes

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

spark_component = spark(
    name="spark_component_5",
    inputs={
        "titanic_data": Input(type="uri_file", mode="direct"),
    },
    outputs={
        "wrangled_data": Output(type="uri_folder", mode="direct"),
    },
    # The source folder of the component
    code="./src",
    entry={"file": "titanic.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=2,
    executor_memory="2g",
    executor_instances=2,
    args="--titanic_data ${{inputs.titanic_data}} --wrangled_data ${{outputs.wrangled_data}}",
)


@dsl.pipeline(
    description="Sample Pipeline with Spark component",
)
def spark_pipeline(spark_input_data):
    spark_step = spark_component(titanic_data=spark_input_data)
    spark_step.inputs.titanic_data.mode = InputOutputModes.DIRECT
    spark_step.outputs.wrangled_data = Output(
        type="uri_folder",
        path="azureml://datastores/workspaceblobstore/paths/data/wrangled/",
    )
    spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT
    spark_step.identity = UserIdentityConfiguration()
    spark_step.resources = {
        "instance_type": "Standard_E8S_V3",
        "runtime_version": "3.2.0",
    }


pipeline = spark_pipeline(
    spark_input_data=Input(
        type="uri_file",
        path="azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
    )
)

pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="Titanic-Spark-Pipeline-SDK-5",
)

# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

### Submit a pipeline job using default identity
Default identity for the serverless Spark compute is user identity.

In [None]:
from azure.ai.ml import MLClient, dsl, spark, Input, Output
from azure.identity import DefaultAzureCredential
from azure.ai.ml.constants import InputOutputModes

subscription_id = "<SUBSCRIPTION_ID>"
resource_group = "<RESOURCE_GROUP>"
workspace = "<AML_WORKSPACE_NAME>"
ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

spark_component = spark(
    name="spark_component_6",
    inputs={
        "titanic_data": Input(type="uri_file", mode="direct"),
    },
    outputs={
        "wrangled_data": Output(type="uri_folder", mode="direct"),
    },
    # The source folder of the component
    code="./src",
    entry={"file": "titanic.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=2,
    executor_memory="2g",
    executor_instances=2,
    args="--titanic_data ${{inputs.titanic_data}} --wrangled_data ${{outputs.wrangled_data}}",
)


@dsl.pipeline(
    description="Sample Pipeline with Spark component",
)
def spark_pipeline(spark_input_data):
    spark_step = spark_component(titanic_data=spark_input_data)
    spark_step.inputs.titanic_data.mode = InputOutputModes.DIRECT
    spark_step.outputs.wrangled_data = Output(
        type="uri_folder",
        path="azureml://datastores/workspaceblobstore/paths/data/wrangled/",
    )
    spark_step.outputs.wrangled_data.mode = InputOutputModes.DIRECT
    spark_step.resources = {
        "instance_type": "Standard_E8S_V3",
        "runtime_version": "3.2.0",
    }


pipeline = spark_pipeline(
    spark_input_data=Input(
        type="uri_file",
        path="azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
    )
)

pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="Titanic-Spark-Pipeline-SDK-6",
)

# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)