In [6]:
from azure.identity import ClientSecretCredential
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory import operations
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time

In [None]:
def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")


def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))

## Uses an APP Client and Secret for authentication

This can be used with a security principal or in the case below an App that has contributor role in ADF

In [7]:
import os

rg_name = 'ecolab-rg'
df_name = 'ecolab-adf'

def connect_to_adf():
    subscription_id = os.getenv("SUBSCRIPTION_ID")

    credentials = ClientSecretCredential(
        client_id = 'a888b9fe-38ff-4551-844f-7416e1cbb89f',
        client_secret=os.getenv("ECOLAB_ADF_SP_SECRET"),
        tenant_id=os.getenv("TENANT_ID")
    )
    print(credentials)
    #credentials = DefaultAzureCredential(exclude_interactive_browser_credential=False)

    #resource_client = ResourceManagementClient(credentials, subscription_id)
    adf_client = DataFactoryManagementClient(credentials, subscription_id)
    return adf_client

In [8]:
#adf_client.operations.list()
#operations.PipelinesOperations.list_by_factory( resource_group_name=rg_name, factory_name=df_name )
#o = operations

adf_client = connect_to_adf()


<azure.identity._credentials.client_secret.ClientSecretCredential object at 0x7fa0c42fd700>


In [None]:
#df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
for x in adf_client.factories.configure_factory_repo():
    print(x)

adf_client.factories.models.FactoryRepoUpdate()
#adf_client.factories.configure_factory_repo(location_id= str, factory_repo_update=azure.mgmt.datafactory.models._models_py3.FactoryRepoUpdate, **kwargs: Any) -> azure.mgmt.datafactory.models._models_py3.Factory


In [12]:
ds_name = 'inside'
dsOut_name = 'outside'

act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)


In [21]:

adf_client.operations.models.FactoryRepoConfiguration(account_name="SupportedCustomers",repository_name="ecolab",collaboration_branch="test1",root_folder="main")

for pipeline in adf_client.pipelines.list_by_factory(resource_group_name=rg_name, factory_name=df_name):
    print(pipeline.name)

    #pipe = adf_client.pipelines.create_or_update(resource_group_name=rg_name, factory_name=df_name, pipeline_name=pipeline.name, pipeline=pipeline)
    #print(pipe)

#    adf_client.operations.models.FactoryRepoUpdate

    for p in pipeline.activities:
        if str(type(p)).split(".")[-1].replace("'>","") == 'CopyActivity':
            print(f' this is the retry value of the activity {p.policy.retry}')
            print(f' this is the timeout value of the activity {p.policy.timeout}')
            print(f'this is the amount of seconds to wait between retry {p.policy.retry_interval_in_seconds}')
            p.policy.retry = 1
            p.policy.timeout = "05:00"
            p.policy.retry_interval_in_seconds = 100
            
            pipe_validation_errors = pipeline.validate()
            for pve in pipe_validation_errors:
                print(pve)


            params_for_pipeline = {}                
            p_obj = PipelineResource(activities=[p], parameters=params_for_pipeline)
            adf_client.pipelines.create_or_update(resource_group_name=rg_name, factory_name=df_name, pipeline_name=pipeline.name, pipeline=p_obj)

            
    pipeline.validate()


first-pipeline
 this is the retry value of the activity 1
 this is the timeout value of the activity 05:00
this is the amount of seconds to wait between retry 100


In [None]:
for dataset in adf_client.datasets.list_by_factory(resource_group_name=rg_name, factory_name=df_name):
    ds = adf_client.datasets.create_or_update(resource_group_name=rg_name, factory_name=df_name, dataset_name=dataset.name, dataset=dataset)

    try:
        ls = adf_client.linked_services.get(resource_group_name=rg_name, factory_name=df_name, linked_service_name=ds.properties.linked_service_name.reference_name)
        
        print(ls.properties.url)
        ls.properties.url="https://www.cnn.com"
        #ls.properties.url="https://swapi.dev/api/people"
        ls.validate
    
        

        adf_client.linked_services.create_or_update(resource_group_name=rg_name, factory_name=df_name, linked_service_name=ls.name, linked_service=ls)
    except Exception as e:
        print(e)

In [None]:
adf_client.linked_services.create_or_update()