In [None]:
from azureml.core.compute import DatabricksCompute
from azureml.core.databricks import PyPiLibrary
from azureml.pipeline.steps import DatabricksStep

In [1]:
#Create an Azure ML workspace object

In [None]:
ws = Workspace(workspace_name = '<workspace_name>',
            subscription_id = '<subscription_ID>',
            resource_group = '<resource_group_name>')

 print('Workspace name: ' + ws.name, 
     'Azure region: ' + ws.location, 
     'Subscription id: ' + ws.subscription_id, 
     'Resource group: ' + ws.resource_group, sep='\n')

In [2]:
#Create a Databricks compute object.

In [None]:
db_compute_name = os.getenv("DATABRICKS_COMPUTE_NAME", '<name>') # Databricks compute name
db_resource_group = os.getenv("DATABRICKS_RESOURCE_GROUP", "<resource_group_name>") # Databricks resource group
db_workspace_name = os.getenv("DATABRICKS_WORKSPACE_NAME", "<workspace_name>") # Databricks workspace name
db_access_token = os.getenv("DATABRICKS_ACCESS_TOKEN", "<access_token>") # Databricks access token

 try:
     databricks_compute = ComputeTarget(
         workspace = ws, name = db_compute_name)
     print('Compute target already exists')
 except ComputeTargetException:
     print('Compute not found, will use below parameters to attach new one')
     print('db_compute_name {}'.format(db_compute_name))
     print('db_resource_group {}'.format(db_resource_group))
     print('db_workspace_name {}'.format(db_workspace_name))
     print('db_access_token {}'.format(db_access_token))

     config = DatabricksCompute.attach_configuration(
        resource_group = db_resource_group,
        workspace_name = db_workspace_name,
        access_token = db_access_token
     )
     databricks_compute = ComputeTarget.attach(workspace = ws, name = db_compute_name, attach_configuration = config)
     databricks_compute.wait_for_completion(True)

In [3]:
#Create a datastore connection

In [None]:
blob_datastore_name = '<datastore_name>' # Name of the datastore to workspace
 container_name = os.getenv("BLOB_CONTAINER", "<container_name>") # Name of Azure blob container
 account_name = os.getenv("BLOB_ACCOUNTNAME", "<account_name>") # Storage account name
 account_key = os.getenv("BLOB_ACCOUNT_KEY", "<account_key>") # Storage account key

 blob_datastore = Datastore.register_azure_blob_container(workspace = ws, 
                                                         datastore_name = blob_datastore_name, 
                                                         container_name = container_name, 
                                                         account_name = account_name,
                                                         account_key = account_key)

 datastore = Datastore.get(ws, datastore_name = "<datastore_name>")
 print(datastore.datastore_type)

In [4]:
#Building the pipeline

In [None]:
input_data = DataReference(datastore = blob_datastore,
                          data_reference_name = 'input_data',
                          path_on_datastore = 'adventureworks/raw_data'
                          )

output_data1 = PipelineData(
    "cleaned_data",
    datastore = blob_datastore)

output_data2 = PipelineData(
    "linear_regression_model",
    datastore = blob_datastore)

In [None]:
dbutils.widgets.get("input_data")
i = getArgument("input_data")
print (i)

dbutils.widgets.get("cleaned_data")
o = getArgument("cleaned_data")
print (o)

customers_input = os.path.join(i, 'AdvWorksCusts.csv')
spending_input = os.path.join(i, 'AW_AveMonthSpend.csv')

In [None]:
dbutils.widgets.get("input")
i = getArgument("input")

dbutils.widgets.get('cleaned_data')
o = getArgument('cleaned_data')

try: 
  dbutils.widgets.get("input_blob_secretname") 
  myinput_blob_secretname = getArgument("input_blob_secretname")

  dbutils.widgets.get("input_blob_config")
  myinput_blob_config = getArgument("input_blob_config")

  dbutils.fs.mount(
    source = i,
    mount_point = "/mnt/input",
    extra_configs = {myinput_blob_config:dbutils.secrets.get(scope = "amlscope", key = myinput_blob_secretname)})
except:
  print('datastore already mounted')

customers_input = os.path.join(i, 'AdvWorksCusts.csv')
spending_input = os.path.join(i, 'AW_AveMonthSpend.csv')

In [5]:
#Build the Databricks pipeline step

In [None]:
data_prep_path = os.getenv("DATABRICKS_PYTHON_SCRIPT_PATH", "/Shared/adventure_works_project/data_prep_spark") # Databricks python script path

 step1 = DatabricksStep(name = "data_preparation",
                     run_name = 'data_preparation',
                     inputs = [input_data],
                     outputs = [output_data1],
                     num_workers = 1,
                     notebook_path = data_prep_path,
                     pypi_libraries = [PyPiLibrary(package = 'scikit-learn')],
                     compute_target = databricks_compute,
                     allow_reuse = False
                     )

In [6]:
#Submitting the pipeline to the Azure ML workspace

In [None]:
steps = [step1, step2, step3]

In [None]:
pipeline = Pipeline(workspace = ws, steps = [steps])
    print('Pipeline is built')
    pipeline.validate()
    print('Pipeline validation complete')

    exp_name = 'webinar_run'
    exp = Experiment(ws, exp_name)
    pipeline_run = exp.submit(pipeline)
    print('Pipeline is submitted for execution')

    pipeline_run.wait_for_completion(show_output = False) 