In [43]:
from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration
from azureml.widgets import RunDetails

In [44]:
synapse_linked = 'synw-dmpbackup-westeu-01p-linked'
synapse_compute_name = 'cc-small'
synapse_pool_name = 'test'

In [45]:
ws = Workspace.from_config()

## Retrieve the link between your Azure Synapse Analytics workspace and your Azure Machine Learning workspace

In [46]:
for service in LinkedService.list(ws) : 
    print(f"Service: {service}")

# Retrieve a known linked service
linked_service = LinkedService.get(ws, synapse_linked)

Service: LinkedService(workspace=Workspace.create(name='mlw-dmpbackup-westeu-01p', subscription_id='25a89471-60b3-4c91-b44f-ca49f38e6137', resource_group='rg-dmpbackup-westeu-01p'), name=synw-dmpbackup-westeu-01p-linked, type=LinkedServiceLinkType.synapse, linked_service_resource_id=/subscriptions/25a89471-60b3-4c91-b44f-ca49f38e6137/resourceGroups/rg-dmpbackup-westeu-01p/providers/Microsoft.Synapse/workspaces/synw-dmpbackup-westeu-01p, system_assigned_identity_principal_id=4920cd8e-47e7-4407-8d4b-2dc91a133c2f


## Attach your Apache spark pool as a compute target for Azure Machine Learning

In [47]:
from azureml.core.compute import SynapseCompute, ComputeTarget


attach_config = SynapseCompute.attach_configuration(
        linked_service=linked_service,
        type="SynapseSpark",
        pool_name=synapse_pool_name,
)

synapse_compute = ComputeTarget.attach(
        workspace=ws,
        name=synapse_compute_name,
        attach_configuration=attach_config,
)

synapse_compute.wait_for_completion()

Provisioning operation finished, operation "Succeeded"


## Create a SynapseSparkStep that uses the linked Apache Spark pool

In [57]:
from azureml.core import Dataset, Datastore

# datastore = ws.get_default_datastore()
datastore = Datastore.get(ws, datastore_name='synapse_datastore')

In [50]:
for datastore in ws.datastores:
    print(datastore)

synapse_datastore
azureml_globaldatasets
workspacefilestore
workspaceartifactstore
workspaceworkingdirectory
workspaceblobstore


In [59]:
from azureml.data import HDFSOutputDatasetConfig

step1_output = HDFSOutputDatasetConfig(destination=(datastore, "test-data")).register_on_complete(name="registered_dataset")

In [60]:
from azureml.core.environment import Environment
from azureml.pipeline.steps import SynapseSparkStep

env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core>=1.20.0")

step_1 = SynapseSparkStep(
    name='synapse-spark',
    file='prep-dataset-synapse.py',
    source_directory="./code/", 
    outputs=[step1_output],
    arguments=["--output_dir", step1_output],
    compute_target=synapse_compute_name,
    driver_memory="7g",
    driver_cores=4,
    executor_memory="7g",
    executor_cores=2,
    num_executors=1,
    environment=env,
)

only conda_dependencies specified in environment will be used in Synapse Spark run.


In [61]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_1])
pipeline_run = pipeline.submit('synapse-pipeline', regenerate_outputs=True)
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Created step synapse-spark [6e5169ec][d0a91f24-f511-44da-818b-4caa56c48c45], (This step will run and generate new outputs)
Submitted PipelineRun ce319e49-5942-47c1-a579-8903ecd1656e
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ce319e49-5942-47c1-a579-8903ecd1656e?wsid=/subscriptions/25a89471-60b3-4c91-b44f-ca49f38e6137/resourcegroups/rg-dmpbackup-westeu-01p/workspaces/mlw-dmpbackup-westeu-01p&tid=041d21aa-b4ab-4ad1-891d-62207b3367ef


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: ce319e49-5942-47c1-a579-8903ecd1656e
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ce319e49-5942-47c1-a579-8903ecd1656e?wsid=/subscriptions/25a89471-60b3-4c91-b44f-ca49f38e6137/resourcegroups/rg-dmpbackup-westeu-01p/workspaces/mlw-dmpbackup-westeu-01p&tid=041d21aa-b4ab-4ad1-891d-62207b3367ef
PipelineRun Status: NotStarted
PipelineRun Status: Running




Expected a StepRun object but received <class 'azureml.core.run.Run'> instead.
This usually indicates a package conflict with one of the dependencies of azureml-core or azureml-pipeline-core.
Please check for package conflicts in your python environment


ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Failed to get metastore properties from the linked service.)",
        "messageParameters": {},
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Failed to get metastore properties from the linked service.)\",\n        \"messageParameters\": {},\n        \"details\": []\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\"\n}"
    }
}

In [None]:
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('../output/predictions', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='../output/predictions')