In [0]:
pip install azure-mgmt-datafactory

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
pip install azure-identity

Collecting azure-identity
  Downloading azure_identity-1.23.0-py3-none-any.whl.metadata (81 kB)
Collecting msal>=1.30.0 (from azure-identity)
  Downloading msal-1.32.3-py3-none-any.whl.metadata (11 kB)
Collecting msal-extensions>=1.2.0 (from azure-identity)
  Downloading msal_extensions-1.3.1-py3-none-any.whl.metadata (7.8 kB)
Downloading azure_identity-1.23.0-py3-none-any.whl (186 kB)
Downloading msal-1.32.3-py3-none-any.whl (115 kB)
Downloading msal_extensions-1.3.1-py3-none-any.whl (20 kB)
Installing collected packages: msal, msal-extensions, azure-identity
Successfully installed azure-identity-1.23.0 msal-1.32.3 msal-extensions-1.3.1
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
%python
import os
from typing import Optional, Dict, List, Any

from azure.identity import ClientSecretCredential
from azure.mgmt.datafactory import DataFactoryManagementClient


class ADFService:
    """Service for interacting with Azure Data Factory."""

    def __init__(self) -> None:
        try:
            self.subscription_id = dbutils.secrets.get(scope="adf-pipeline", key="subscription-id")
            self.resource_group = dbutils.secrets.get('adf-pipeline', key='resource-group')
            self.factory_name = dbutils.secrets.get('adf-pipeline', key='factory-name')
            tenant_id = dbutils.secrets.get('adf-pipeline', key='fuam-spn-tenant-id')
            client_id = dbutils.secrets.get('adf-pipeline', key='adf-automation-spn-client-id')
            client_secret = dbutils.secrets.get('adf-pipeline', key='adf-automation-spn-client-secret')

        except KeyError as e:
            raise EnvironmentError(f"Missing required environment variable: {e}")

        credential = ClientSecretCredential(
            tenant_id=tenant_id,
            client_id=client_id,
            client_secret=client_secret,
        )

        self.client = DataFactoryManagementClient(credential, self.subscription_id)

    def trigger_pipeline(self, pipeline_name: str, parameters: Optional[Dict[str, str]] = None) -> str:
        """Trigger an existing Azure Data Factory pipeline.

        Args:
            pipeline_name: Name of the pipeline to run.
            parameters: Optional parameters to pass to the pipeline.

        Returns:
            The run ID of the triggered pipeline run.
        """
        try:
            run_response = self.client.pipelines.create_run(
                resource_group_name=self.resource_group,
                factory_name=self.factory_name,
                pipeline_name=pipeline_name,
                parameters=parameters or {},
            )
            return run_response.run_id
        except Exception as e:
            print(f"Error triggering pipeline '{pipeline_name}': {e}")

    def get_pipeline_run_status(self, run_id: str) -> Dict[str, Any]:
        """Get the status of a pipeline run.

        Args:
            run_id: The run ID returned by trigger_pipeline.

        Returns:
            Dictionary containing status information about the pipeline run.
        """
        run_response = self.client.pipeline_runs.get(
            resource_group_name=self.resource_group,
            factory_name=self.factory_name,
            run_id=run_id,
        )
        return {
            "run_id": run_response.run_id,
            "pipeline_name": run_response.pipeline_name,
            "status": run_response.status,
            "start_time": run_response.run_start,
            "end_time": run_response.run_end,
            "duration_in_ms": run_response.run_duration or 0,
            "parameters": run_response.parameters,
        }

    def list_pipelines(self) -> List[Dict[str, Any]]:
        """List all pipelines in the Azure Data Factory.

        Returns:
            List of dictionaries containing pipeline information.
        """
        pipelines = self.client.pipelines.list_by_factory(
            resource_group_name=self.resource_group,
            factory_name=self.factory_name,
        )

        result = []
        for pipeline in pipelines:
            result.append(
                {
                    "id": pipeline.id,
                    "name": pipeline.name,
                    "type": pipeline.type,
                    "description": pipeline.description,
                    "folder_name": pipeline.folder.name if pipeline.folder else None,
                }
            )
        return result

    def cancel_pipeline_run(self, run_id: str) -> None:
        """Cancel a pipeline run.

        Args:
            run_id: The run ID of the pipeline run to cancel.
        """
        self.client.pipeline_runs.cancel(
            resource_group_name=self.resource_group,
            factory_name=self.factory_name,
            run_id=run_id,
        )

In [0]:
service = ADFService()

service.client

<azure.mgmt.datafactory._data_factory_management_client.DataFactoryManagementClient at 0x7f87e6223310>

In [0]:
pipelines = service.list_pipelines()


print(f"Found {len(pipelines)} pipelines:")
for i, pipeline in enumerate(pipelines, 1):
    folder = f" (folder: {pipeline['folder_name']})" if pipeline['folder_name'] else ""
    desc = f" - {pipeline['description']}" if pipeline['description'] else ""
    print(f"{i}. {pipeline['name']}{folder}{desc}")

Found 11 pipelines:
1. pl_orchestration_recipe_1
2. pl_orchestration_recipe_5_child
3. pl_orchestration_recipe_4
4. pl_orchestration_recipe_2
5. pl_orchestration_recipe_3
6. pl_orchestration_recipe_5_parent
7. pl_api_ingestion_to_synapse - Pipeline to ingest from and API and sink to Synapse

8. pl_orchestration_recipe_7_trigger
9. plRunPythonOnSHIR
10. plDICOM_Ingest - Ingest DICOM files from source VM
11. plCopyData - Example Pipeline to Copy Data from one location to another.

Parameterize Source and Sink based on input


In [0]:
# trigger pipeline run
pipeline_name = 'plCopyData'
parameters = {'file': 'SalesLTProduct.csv', 'sourcePath': 'wwi'}

run_id = service.trigger_pipeline(pipeline_name, parameters)

print(f"Pipeline run triggered with ID: {run_id}")

Pipeline run triggered with ID: aecb6bd0-4085-11f0-af6b-7c1e52cc27eb
