Skip to content

Latest commit


82 lines (54 loc) 路 2.76 KB


File metadata and controls

82 lines (54 loc) 路 2.76 KB

Integration with orchestration tools

Integration with Azure Data Factory

To perform integration with Azure Data Factory, please do the following steps:

  1. Please ensure that pipeline is created and published in Azure Data Factory.
  2. Inside your CI pipeline, deploy latest job versions and write deployment result into a file:
dbx deploy --write-specs-to-file=./dbx/deployment-result.json
  1. Reflect job definitions to Azure Data Factory activities:
dbx datafactory reflect \
    --specs-file=.dbx/deployment-result.json \
    --subscription-name some-subscription \
    --resource-group some-group \
    --factory-name some-factory \
    --name some-pipeline-name

This command will create or update linked services and pipeline activities. Each job will be configured as a separate activity.


Please note following limitations of this approach:
  • runs triggered from Azure Data Factory won't be mentioned in the job runs
  • changing job definition manually in Databricks UI won't change the properties of ADF-defined activities
  • only Python-based activities are supported at this moment
  • MSI authentication is not yet supported
  • policy_id argument is not yet supported (it will be ignored during deployment to ADF)

Integration with Apache Airflow

To trigger job execution from Apache Airflow, please do the following:

  • Deploy jobs to Databricks:
dbx deploy
  • Add this function to get job id by the job name into your Airflow setup:
from airflow.contrib.hooks.databricks_hook import DatabricksHook

def get_job_id_by_name(job_name: str, databricks_conn_id: str) -> str:
    list_endpoint = ('GET', 'api/2.0/jobs/list')
    hook = DatabricksHook(databricks_conn_id=databricks_conn_id)
    response_payload = hook._do_api_call(list_endpoint, {})
    all_jobs = response_payload.get("jobs", [])
    matching_jobs = [j for j in all_jobs if j["settings"]["name"] == job_name]

    if not matching_jobs:
        raise Exception(f"Job with name {job_name} not found")

    if len(matching_jobs) > 1:
        raise Exception(f"Job with name {job_name} is duplicated. Please make job name unique in Databricks UI.")

    job_id = matching_jobs[0]["job_id"]
    return job_id
  • Use this function from your DAG:
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator

job_id = get_job_id_by_name("some-job-name", "some-databricks-conn-id")
operator = DatabricksRunNowOperator(
    # add your arguments