<a href = "https://www.pieriantraining.com"><img src="../PT Centered Purple.png"> </a>

<em style="text-align:center">Copyrighted by Pierian Training</em>

# Azure Data Factory with Python

## Azure Actions Covered

* Creating and listing Azure Data Factories
* Linking resources to an Azure Data Factory
* Linking datasets to an Azure Data Factory
* Setting up and running pipelines

In this lecture, we'll learn how to set up Azure Data Factories and data pipelines with Python.

To begin, we'll need to import our usual libraries as well as any useful environment variables (e.g. AZURE_SUBSCRIPTION_ID). We'll add some new imports as well.

In [1]:
from azure.identity import AzureCliCredential
# New imports for data lake storage
from azure.storage.filedatalake import DataLakeServiceClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory import models 

from settings import AZURE_SUBSCRIPTION_ID, DEFAULT_LOCATION, DEFAULT_RESOURCE_GROUP, DATA_LAKE_CONNECTION_STRING

Let's instantiate our credential and a `DataFactoryManagementClient`.

In [2]:
credential = AzureCliCredential()
df_client = DataFactoryManagementClient(credential, AZURE_SUBSCRIPTION_ID)

## Data Factory

We can create a new Azure Data Factory with these parameters:
* `resource_group_name` - Name of resource group under which to create ADF
* `factory_name` - Name for new ADF
* `factory` - Parameters for new ADF, based on `Factory` object. For full list of parameters, see the [Factory class](https://learn.microsoft.com/en-us/python/api/azure-mgmt-datafactory/azure.mgmt.datafactory.models.factory?view=azure-python)

In [5]:
data_factory = df_client.factories.create_or_update(
    resource_group_name=DEFAULT_RESOURCE_GROUP,
    factory_name='bens-data-factory1234',
    factory=models.Factory(location=DEFAULT_LOCATION)
)

## Linked Services

In ADF, you can link to the various resources you might need to process data (e.g. data storage). All of these resources have their own models. Some examples are:
* `AzureStorageLinkedService`
* `AzureBlobStorageLinkedService`
* `FtpServerLinkedService`

For the full list of linked service models, see [the LinkedService base class](https://learn.microsoft.com/en-us/python/api/azure-mgmt-datafactory/azure.mgmt.datafactory.models.linkedservice?view=azure-python)

We'll connected to our data lake storage. First, we'll create the linked service resource object in Python.

In [30]:
ls_dl_store = models.LinkedServiceResource(
    properties=models.AzureStorageLinkedService(
        connection_string=DATA_LAKE_CONNECTION_STRING
    )
)

Next, we'll use our data factory client to create the linked service in our factory.

In [31]:
ls = df_client.linked_services.create_or_update(
    resource_group_name=DEFAULT_RESOURCE_GROUP,
    factory_name='bens-data-factory1234',
    linked_service_name='bens-storage-linked-service-1',
    linked_service=ls_dl_store
)

Let's look at some of the attributes of our returned linked service.

In [17]:
ls.type

'Microsoft.DataFactory/factories/linkedservices'

In [18]:
ls.name

'bens-data-lake-linked-service-1'

## Datasets

To work with datasets in ADF, we need to create them just like the linked resouces. Additionally, the datasets should be tied to our linked services. The pipeline we're going to create will copy data from one data source to another, so we'll need two datasets.

The `create_or_update()` method for datasets takes the following parameters:
* `resource_group_name` - Name for the resource group
* `factory_name` - Name of the factory for the dataset
* `dataset_name` - Name for our dataset
* `dataset` - Properties for the dataset. These can be represented by a `DatasetResource` object, whose properties will be a `Dataset` object. For a list of all possible dataset classes, see [the Dataset base class](https://learn.microsoft.com/en-us/python/api/azure-mgmt-datafactory/azure.mgmt.datafactory.models.dataset?view=azure-python). Some examples are:
    * `AzureBlobDataset`
    * `AzurePostgreSqlTableDataset`
    * `SnowflakeDataset`

We'll first create our input dataset.

In [33]:
ds_resource = df_client.datasets.create_or_update(
    resource_group_name=DEFAULT_RESOURCE_GROUP,
    factory_name='bens-data-factory1234',
    dataset_name='income',
    dataset=models.DatasetResource(
        properties=models.AzureBlobDataset(
            linked_service_name=models.LinkedServiceReference(
                type='LinkedServiceReference',
                reference_name=ls.name
            ),
            folder_path='dl-file-system/raw-data',
            file_name='income.csv'
        )
    )
)

Let's look at some of the returned properties.

In [34]:
ds_resource.as_dict()

{'id': '/subscriptions/bf8c33be-e4bb-46c8-871a-85182d913c50/resourceGroups/default-resource-group/providers/Microsoft.DataFactory/factories/bens-data-factory1234/datasets/income',
 'name': 'income',
 'type': 'Microsoft.DataFactory/factories/datasets',
 'etag': '75008a5e-0000-0100-0000-648229060000',
 'properties': {'type': 'AzureBlob',
  'linked_service_name': {'type': 'LinkedServiceReference',
   'reference_name': 'bens-storage-linked-service'},
  'folder_path': 'dl-file-system/raw-data',
  'file_name': 'income.csv'}}

Now let's create the output dataset in our ADF.

In [45]:
ds_out_resource = df_client.datasets.create_or_update(
    resource_group_name=DEFAULT_RESOURCE_GROUP,
    factory_name=data_factory.name,
    dataset_name='income_transformed',
    dataset=DatasetResource(
        properties=AzureBlobDataset(
            linked_service_name=LinkedServiceReference(
                type='LinkedServiceReference',
                reference_name=ls.name
            ),
            folder_path='dl-file-system/raw-data',
            file_name='income_transformed.csv'
        )
    )
)

## Activities, Pipelines, and Pipeline Runs

A pipeline in ADF will consist of one or more activities, so first we'll need to create the activity. In this lecture, we'll have a simple copy activity. The copy activity needs:
* Inputs/sources
* Outputs/sinks

These will come from our datasets.

In [46]:
copy_activity = CopyActivity(
    name='copyIncomeData',
    inputs=[DatasetReference(type="DatasetReference", reference_name=ds_resource.name)],
    outputs=[DatasetReference(type="DatasetReference", reference_name=ds_out_resource.name)],
    source=BlobSource(),
    sink=BlobSink()
)

Now that we've defined our activity, we can create a new pipeline in our data factory.

In [47]:
pipeline = df_client.pipelines.create_or_update(
    resource_group_name=DEFAULT_RESOURCE_GROUP,
    factory_name=data_factory.name,
    pipeline_name='bens-pipeline',
    pipeline=PipelineResource(
        activities=[copy_activity],
        parameters={}
    )
)

Once it's created, let's run our new pipeline.

In [49]:
pipeline_run = df_client.pipelines.create_run(
    resource_group_name=DEFAULT_RESOURCE_GROUP,
    factory_name=data_factory.name,
    pipeline_name='bens-pipeline',
    parameters={}
)

To manage our pipelines, we can list all of them by data factory.

In [50]:
for pipe in df_client.pipelines.list_by_factory(DEFAULT_RESOURCE_GROUP, data_factory.name):
    print(pipe)

{'additional_properties': None, 'id': '/subscriptions/bf8c33be-e4bb-46c8-871a-85182d913c50/resourceGroups/default-resource-group/providers/Microsoft.DataFactory/factories/bens-data-factory1234/pipelines/bens-pipeline', 'name': 'bens-pipeline', 'type': 'Microsoft.DataFactory/factories/pipelines', 'etag': '75001a5f-0000-0100-0000-64822c760000', 'description': None, 'activities': [<azure.mgmt.datafactory.models._models_py3.CopyActivity object at 0x7f3f35775730>], 'parameters': {}, 'variables': None, 'concurrency': None, 'annotations': None, 'run_dimensions': None, 'folder': None, 'policy': None}
{'additional_properties': None, 'id': '/subscriptions/bf8c33be-e4bb-46c8-871a-85182d913c50/resourceGroups/default-resource-group/providers/Microsoft.DataFactory/factories/bens-data-factory1234/pipelines/test-pipeline', 'name': 'test-pipeline', 'type': 'Microsoft.DataFactory/factories/pipelines', 'etag': '75003a64-0000-0100-0000-6482500c0000', 'description': None, 'activities': [<azure.mgmt.datafac