## 1. Load libraries

In [10]:
from azure.identity import ClientSecretCredential
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from msrest.authentication import BasicTokenAuthentication
from azure.core.pipeline.policies import BearerTokenCredentialPolicy
from azure.core.pipeline import PipelineRequest, PipelineContext
from azure.core.pipeline.transport import HttpRequest
from azure.identity import DefaultAzureCredential

## 2. Create unformation functions 

In [11]:
def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)


def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")


def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))

## 3. Add credential class whapper

In [12]:
class CredentialWrapper(BasicTokenAuthentication):
    def __init__(self, credential=None, resource_id="https://management.azure.com/.default", **kwargs):
        """Wrap any azure-identity credential to work with SDK that needs azure.common.credentials/msrestazure.
        Default resource is ARM (syntax of endpoint v2)
        :param credential: Any azure-identity credential (DefaultAzureCredential by default)
        :param str resource_id: The scope to use to get the token (default ARM)
        """
        super(CredentialWrapper, self).__init__(None)
        if credential is None:
            credential = DefaultAzureCredential()
        self._policy = BearerTokenCredentialPolicy(credential, resource_id, **kwargs)

    def _make_request(self):
        return PipelineRequest(
            HttpRequest(
                "CredentialWrapper",
                "https://fakeurl"
            ),
            PipelineContext(None)
        )

    def set_token(self):
        """Ask the azure-core BearerTokenCredentialPolicy policy to get a token.
        Using the policy gives us for free the caching system of azure-core.
        We could make this code simpler by using private method, but by definition
        I can't assure they will be there forever, so mocking a fake call to the policy
        to extract the token, using 100% public API."""
        request = self._make_request()
        self._policy.on_request(request)
        # Read Authorization, and get the second part after Bearer
        token = request.http_request.headers["Authorization"].split(" ", 1)[1]
        self.token = {"access_token": token}

    def signed_session(self, session=None):
        self.set_token()
        return super(CredentialWrapper, self).signed_session(session)

## 4. Authentificate Azure

In [17]:
    # Azure subscription ID
subscription_id = 'd388216e-67b8-41b3-9832-d6511a1a70be'

rg_name = 'ADFCookbook'
df_name = 'ADFCookbook-From-Python' 

credential = ClientSecretCredential(
        tenant_id='enter tenant id',
        client_id='enterclient id',
        client_secret='enter client secret id'
)
credentials = CredentialWrapper(credential)

## 5. Create Data Factory

In [18]:
adf_client = DataFactoryManagementClient(credentials, subscription_id)
df_resource = Factory(location='eastus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)

	Name: ADFCookbook-From-Python
	Id: /subscriptions/d388216e-67b8-41b3-9832-d6511a1a70be/resourceGroups/adfcookbook/providers/Microsoft.DataFactory/factories/adfcookbook-from-python
	Location: eastus
	Tags: {}


## 6. Created a linked service

In [19]:
# Create an Azure Storage linked service
ls_name = 'ADFCookbookLinkedServicePython'

# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=adfcookbookstorage;AccountKey=enter your account key')

ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)

	Name: ADFCookbookLinkedServicePython
	Id: /subscriptions/d388216e-67b8-41b3-9832-d6511a1a70be/resourceGroups/ADFCookbook/providers/Microsoft.DataFactory/factories/ADFCookbook-From-Python/linkedservices/ADFCookbookLinkedServicePython





## 7. Create input dataset

In [20]:
ds_name = 'ADFCookbookDS-Input-Python'
ds_ls = LinkedServiceReference(reference_name=ls_name)
blob_path= 'adfcookbook/input'
blob_filename = 'SalesOrders.txt'
ds_azure_blob= AzureBlobDataset(linked_service_name=ds_ls, folder_path=blob_path, file_name = blob_filename)
ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)

	Name: ADFCookbookDS-Input-Python
	Id: /subscriptions/d388216e-67b8-41b3-9832-d6511a1a70be/resourceGroups/ADFCookbook/providers/Microsoft.DataFactory/factories/ADFCookbook-From-Python/datasets/ADFCookbookDS-Input-Python





## 8. Create output dataset

In [21]:
dsOut_name = 'ADFCookbookDS-Output-Python'
output_blobpath = 'adfcookbook/output'
dsOut_azure_blob = AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath)
dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)

	Name: ADFCookbookDS-Output-Python
	Id: /subscriptions/d388216e-67b8-41b3-9832-d6511a1a70be/resourceGroups/ADFCookbook/providers/Microsoft.DataFactory/factories/ADFCookbook-From-Python/datasets/ADFCookbookDS-Output-Python





## 9. Create a pipeline

In [22]:
# Create a copy activity
act_name = 'ADFCookbookCopyData'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

#Create a pipeline with the copy activity
p_name = 'ADFCookbookCopyDataPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)

	Name: ADFCookbookCopyDataPipeline
	Id: /subscriptions/d388216e-67b8-41b3-9832-d6511a1a70be/resourceGroups/ADFCookbook/providers/Microsoft.DataFactory/factories/ADFCookbook-From-Python/pipelines/ADFCookbookCopyDataPipeline


## 10. Create a pipeline run

In [23]:
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})

## 11. Monitor a pipeline run

In [25]:
pipeline_run = adf_client.pipeline_runs.get(rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))


	Pipeline run status: Succeeded
