# DataBricksのclusterをAMLにAttachする

In [11]:
from azureml.core import Workspace
from azureml.core.compute import DatabricksCompute, ComputeTarget

ws = Workspace.from_config("./config")

db_resource_group = "dp100"
db_workspace_name = "Databricks01"
# Access Token は DatabricksのUser Settingsから作成する
db_access_token = "dapi855095b836fa289bf1ecb757dcaaeb94"

db_compute_name = "mydbcluster001"
if db_compute_name not in ws.compute_targets:
    attach_config = DatabricksCompute.attach_configuration(
                                    resource_group=db_resource_group,
                                    workspace_name=db_workspace_name,
                                    access_token=db_access_token)

    db_cluster = ComputeTarget.attach(ws, db_compute_name, attach_config)
    db_cluster.wait_for_completion(True)

else:
    db_cluster = ws.compute_targets[db_compute_name]

# 設定を行う

In [102]:
# ------------------------------------------------------
# Run the DatabricksStep as an AzureML Pipeline step
# ------------------------------------------------------
from azureml.core import Workspace

# Access the Workspace
ws = Workspace.from_config("./config")


# -----------------------------------------------------------------
# Create custom environment
# -----------------------------------------------------------------
from azureml.core import Environment
from azureml.core.environment import CondaDependencies

# Create the environment
myenv = Environment(name="MyEnvironment")

# Create the dependencies object
myenv_dep = CondaDependencies.create(conda_packages=['scikit-learn', 'joblib', 'pandas'])

myenv.python.conda_dependencies = myenv_dep

# Register the environment
myenv.register(ws)



# -----------------------------------------------------------------
# Create a compute cluster for pipeline
# -----------------------------------------------------------------
cluster_name = "pipeline-cluster"

# # Provisioning configuration using AmlCompute
from azureml.core.compute import AmlCompute

print("Accessing the compute cluster...")

if cluster_name not in ws.compute_targets:
    print("Creating the compute cluster with name: ", cluster_name)
    compute_config = AmlCompute.provisioning_configuration(
                                      vm_size="STANDARD_D11_V2",
                                      max_nodes=2)

    compute_cluster = AmlCompute.create(ws, cluster_name, compute_config)
    compute_cluster.wait_for_completion()
else:
    compute_cluster = ws.compute_targets[cluster_name]
    print(cluster_name, ", compute cluster found. Using it...")


# -----------------------------------------------------------------
# Create Run Configurations for the steps
# -----------------------------------------------------------------
from azureml.core.runconfig import RunConfiguration
run_config = RunConfiguration()

run_config.target = compute_cluster
run_config.environment = myenv



# -----------------------------------------------------------------
# Attach the Databricks Cluster as an attached compute target
# -----------------------------------------------------------------
from azureml.core.compute    import DatabricksCompute
from azureml.core.compute    import ComputeTarget

# Initialize the attach config parameters
db_resource_group = "dp100"
db_workspace_name = "Databricks01"
# Access Token は DatabricksのUser Settingsから作成する
db_access_token = "dapi855095b836fa289bf1ecb757dcaaeb94"
db_compute_name = "mydbcluster001"


# Attach the Databricks compute target
if db_compute_name not in ws.compute_targets:
    print("Creating attach config for Databricks...")
    attach_config = DatabricksCompute.attach_configuration(
                            resource_group = db_resource_group,
                            workspace_name = db_workspace_name,
                            access_token = db_access_token)
    
    print("Attaching Databricks Cluster to AzureML workspace..")
    db_cluster = ComputeTarget.attach(ws,
                                      db_compute_name,
                                      attach_config)

    db_cluster.wait_for_completion(True)

else:
    print('Compute target already exists')
    db_cluster = ws.compute_targets[db_compute_name]


# -----------------------------------------------------------------
# Create/pass data reference of Input and Output
# -----------------------------------------------------------------
from azureml.data.data_reference import DataReference
from azureml.pipeline.core   import PipelineData

# Create input data reference
# data_store = ws.get_default_datastore()
data_store = ws.datastores.get('azure_sdk_blob01')

input_data = DataReference(datastore = data_store,
                           data_reference_name = 'input')

output_data1 = PipelineData('testdata', datastore=data_store)

Accessing the compute cluster...
pipeline-cluster , compute cluster found. Using it...
Compute target already exists


# Databricksstepを作成する

In [103]:
# Create the Databricks Step
from azureml.pipeline.steps import DatabricksStep
from azureml.core.databricks import PyPiLibrary

scikit_learn = PyPiLibrary(package = 'scikit-learn')
joblib       = PyPiLibrary(package = 'joblib')


# Databricks ⇒ Cluster ⇒ NotebookからPathを確認可能
notebook_path = r"/Users/nakamukaiya@gmail.com/demo001"

db_step01 = DatabricksStep(name = "db_step01",
                           inputs = [input_data],
                           outputs = [output_data1],
                           num_workers = 1,
                           notebook_path = notebook_path,
                           run_name = "db_notebook_demo",
                           compute_target = db_cluster,
                           pypi_libraries = [scikit_learn, joblib],
                           allow_reuse = False) # Trueにすると、過去のrunを再利用できる

In [104]:
# Databricks stepでAIの判定を行い、Storageに結果を保存
# eval stepでそれを読み込んで評価を行う
# -----------------------------------------------------------------
# Create the pipeline step to run python script
# ----------------------------------------------------------------
from azureml.pipeline.steps import PythonScriptStep

eval_step    = PythonScriptStep(name='Evaluate',
                                 source_directory='./script',
                                 script_name='630 - Evaluate.py',
                                 inputs=[output_data1],
                                 runconfig=run_config,
                                 arguments=['--testdata', output_data1])

# -----------------------------------------------------------------
# Build and submit the pipeline
# -----------------------------------------------------------------
from azureml.pipeline.core   import Pipeline
from azureml.core            import Experiment

steps             = [db_step01, eval_step]
new_pipeline      = Pipeline(workspace=ws, steps=steps)
new_pipeline_run  = Experiment(ws, 'DB_Notebook_exp001').submit(new_pipeline)

# Wait for completion
new_pipeline_run.wait_for_completion(show_output=True)

Created step db_step01 [720f1a26][73435178-c871-4eeb-a0e4-c097e7d31a8d], (This step will run and generate new outputs)
Created step Evaluate [61366517][70931347-1ce4-4ab9-9728-2df1d39e7e0e], (This step is eligible to reuse a previous run's output)
Using data reference input for StepId [0e5b692f][5b0dfa0b-94df-4882-add4-43770e66ecb3], (Consumers of this data are eligible to reuse prior runs.)
Submitted PipelineRun af7da592-2d0b-4aa1-8666-fc313786c580
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/af7da592-2d0b-4aa1-8666-fc313786c580?wsid=/subscriptions/3467f739-a57b-4612-9de8-72a6616c01b3/resourcegroups/AzuremlSDKRG00/workspaces/Azureml-SDK-WS01&tid=bcd8db96-8bb9-4f0d-af35-e471bf92c072
PipelineRunId: af7da592-2d0b-4aa1-8666-fc313786c580
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/af7da592-2d0b-4aa1-8666-fc313786c580?wsid=/subscriptions/3467f739-a57b-4612-9de8-72a6616c01b3/resourcegroups/AzuremlSDKRG00/workspaces/Azureml-SDK-WS01&tid=bcd8db96-8bb9-4

Collecting package metadata (repodata.json): ...working... 
done
Solving environment: ...working... 
done

Downloading and Extracting Packages

threadpoolctl-2.2.0  | 16 KB     |            |   0% 
threadpoolctl-2.2.0  | 16 KB     | ########## | 100% 
threadpoolctl-2.2.0  | 16 KB     | ########## | 100% 

mkl-2021.4.0         | 219.1 MB  |            |   0% 
mkl-2021.4.0         | 219.1 MB  | 3          |   4% 
mkl-2021.4.0         | 219.1 MB  | #          |  11% 
mkl-2021.4.0         | 219.1 MB  | #7         |  18% 
mkl-2021.4.0         | 219.1 MB  | ##4        |  24% 
mkl-2021.4.0         | 219.1 MB  | ###        |  31% 
mkl-2021.4.0         | 219.1 MB  | ###7       |  38% 
mkl-2021.4.0         | 219.1 MB  | ####4      |  45% 
mkl-2021.4.0         | 219.1 MB  | #####1     |  51% 
mkl-2021.4.0         | 219.1 MB  | #####7     |  58% 
mkl-2021.4.0         | 219.1 MB  | ######4    |  65% 
mkl-2021.4.0         | 219.1 MB  | #######1   |  71% 
mkl-2021.4.0         | 219.1 MB  | #######7  

[91m

  current version: 4.11.0
  latest version: 22.9.0

Please update conda by running

    $ conda update -n base -c defaults conda


[0m#
# To activate this environment, use
#
#     $ conda activate /azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac
#
# To deactivate an active environment, use
#
#     $ conda deactivate

Removing intermediate container 30b3420e0920
 ---> 6769646da437
Step 9/21 : ENV PATH /azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/bin:$PATH
 ---> Running in 97e43dfbdd95
Removing intermediate container 97e43dfbdd95
 ---> 34264a93b1f1
Step 10/21 : COPY azureml-environment-setup/send_conda_dependencies.py azureml-environment-setup/send_conda_dependencies.py
 ---> ab949bb30087
Step 11/21 : RUN echo "Copying environment context"
 ---> Running in 72d610b215ec
Copying environment context
Removing intermediate container 72d610b215ec
 ---> d8661b296c3d
Step 12/21 : COPY azureml-environment-setup/environment_context.json azureml-environment-setup/environment


StepRun(Evaluate) Execution Summary
StepRun( Evaluate ) Status: Failed

AzureMLCompute job failed.
ExecutionFailed: [REDACTED]
	exit_codes: 1
{"NonCompliant":"Process '/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/bin/python' exited with code 1 and error message 'Execution failed. Process exited with status code 1. Error: Traceback (most recent call last):\n  File \"630 - Evaluate.py\"


ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "{\"NonCompliant\":\"Process '/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/bin/python' exited with code 1 and error message 'Execution failed. Process exited with status code 1. Error: Traceback (most recent call last):\\n  File \\\"630 - Evaluate.py\\\", line 44, in <module>\\n    rfc = joblib.load(obj_file)\\n  File \\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/site-packages/joblib/numpy_pickle.py\\\", line 587, in load\\n    obj = _unpickle(fobj, filename, mmap_mode)\\n  File \\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/site-packages/joblib/numpy_pickle.py\\\", line 506, in _unpickle\\n    obj = unpickler.load()\\n  File \\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/pickle.py\\\", line 1212, in load\\n    dispatch[key[0]](self)\\n  File \\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/pickle.py\\\", line 1528, in load_global\\n    klass = self.find_class(module, name)\\n  File \\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/pickle.py\\\", line 1579, in find_class\\n    __import__(module, level=0)\\nModuleNotFoundError: No module named 'sklearn.ensemble.forest'\\n\\n'. Please check the log file 'user_logs/std_log.txt' for more details.\"}\n{\n  \"code\": \"ExecutionFailed\",\n  \"target\": \"\",\n  \"category\": \"UserError\",\n  \"error_details\": [\n    {\n      \"key\": \"exit_codes\",\n      \"value\": \"1\"\n    }\n  ]\n}",
        "messageParameters": {},
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"{\\\"NonCompliant\\\":\\\"Process '/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/bin/python' exited with code 1 and error message 'Execution failed. Process exited with status code 1. Error: Traceback (most recent call last):\\\\n  File \\\\\\\"630 - Evaluate.py\\\\\\\", line 44, in <module>\\\\n    rfc = joblib.load(obj_file)\\\\n  File \\\\\\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/site-packages/joblib/numpy_pickle.py\\\\\\\", line 587, in load\\\\n    obj = _unpickle(fobj, filename, mmap_mode)\\\\n  File \\\\\\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/site-packages/joblib/numpy_pickle.py\\\\\\\", line 506, in _unpickle\\\\n    obj = unpickler.load()\\\\n  File \\\\\\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/pickle.py\\\\\\\", line 1212, in load\\\\n    dispatch[key[0]](self)\\\\n  File \\\\\\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/pickle.py\\\\\\\", line 1528, in load_global\\\\n    klass = self.find_class(module, name)\\\\n  File \\\\\\\"/azureml-envs/azureml_c24d2aa9d2a47b6687cc1ba0963dfdac/lib/python3.8/pickle.py\\\\\\\", line 1579, in find_class\\\\n    __import__(module, level=0)\\\\nModuleNotFoundError: No module named 'sklearn.ensemble.forest'\\\\n\\\\n'. Please check the log file 'user_logs/std_log.txt' for more details.\\\"}\\n{\\n  \\\"code\\\": \\\"ExecutionFailed\\\",\\n  \\\"target\\\": \\\"\\\",\\n  \\\"category\\\": \\\"UserError\\\",\\n  \\\"error_details\\\": [\\n    {\\n      \\\"key\\\": \\\"exit_codes\\\",\\n      \\\"value\\\": \\\"1\\\"\\n    }\\n  ]\\n}\",\n        \"messageParameters\": {},\n        \"details\": []\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\"\n}"
    }
}

In [None]:
# Databricks stepのアウトプットはDatabricksのrunから確認したrun_IDのフォルダ名の中に格納されている
# --AZUREML_RUN_ID 5192c843-7a9c-4c25-be32-8e48da275e39