In [1]:
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
import json

In [2]:
def print_item(group):
    """Print an Azure object instance."""
    print("\tName: {}".format(group.name))
    print("\tId: {}".format(group.id))
    if hasattr(group, 'location'):
        print("\tLocation: {}".format(group.location))
    if hasattr(group, 'tags'):
        print("\tTags: {}".format(group.tags))
    if hasattr(group, 'properties'):
        print_properties(group.properties)

def print_properties(props):
    """Print a ResourceGroup properties instance."""
    if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
        print("\tProperties:")
        print("\t\tProvisioning State: {}".format(props.provisioning_state))
    print("\n\n")

def print_activity_run_details(activity_run):
    """Print activity run details."""
    print("\n\tActivity run details\n")
    print("\tActivity run status: {}".format(activity_run.status))
    if activity_run.status == 'Succeeded':
        print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
        print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
        print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
    else:
        print("\tErrors: {}".format(activity_run.error['message']))

In [4]:
settings = json.load(open('local.settings.json'))

In [5]:
subscription_id = '16be532f-d207-4d70-966d-c6ba9ad7d410'
client_id = '7803dca6-c96e-421e-bfda-c9299d62386c' 
secret = 'b[5qDh4kNNlXxKokX2A?M3?WCsQZ?hMA'
tenant = 'e4e34038-ea1f-4882-b6e8-ccd776459ca0'

rg_params = {'location':'centralus'}
df_params = {'location':'eastus'}

blob_path= 'mycontainer'
blob_filename = 'TimeSeriesData.csv'
rg_name = 'DF-BlobAutomation'
df_name = 'automated-df'
table_name = 'automatedcsv'

In [6]:
credentials = ServicePrincipalCredentials(client_id=client_id, secret=secret, tenant=tenant)
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)

## Create Blob Linked Service

In [7]:
ls_name = 'storageLinkedService'
storage_string = SecureString(settings['Values']['blob_con_string'])
ls_azure_storage = AzureStorageLinkedService(connection_string=storage_string)
blob_ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)

## Create SQL Linked Service

In [8]:
sql_ls_name = 'sqlLinkedService'
sql_string = SecureString(settings['Values']['sql_ls_string'])
ls_azure_sql = AzureSqlDatabaseLinkedService(connection_string=sql_string)
sql_ls = adf_client.linked_services.create_or_update(rg_name, df_name, sql_ls_name, ls_azure_sql)

## Create Azure Blob Dataset

In [9]:
ds_name = 'ds_in'
csv_format = TextFormat(column_delimiter=',',first_row_as_header=True)
ds_ls = LinkedServiceReference(ls_name)
ds_azure_blob= AzureBlobDataset(ds_ls, folder_path=blob_path,file_name = blob_filename,format=csv_format)
ds = adf_client.datasets.create_or_update(rg_name, df_name, ds_name, ds_azure_blob)

## Create SQL Dataset

In [10]:

dsOut_name = 'ds_out'
sql_ls_ref = LinkedServiceReference(sql_ls_name)

In [11]:
dsOut_azure_sql = AzureSqlTableDataset(linked_service_name=sql_ls_ref,table_name=table_name)
dsOut = adf_client.datasets.create_or_update(rg_name, df_name, dsOut_name, dsOut_azure_sql)

In [13]:
dsOut_azure_sql.as_dict()

{'linked_service_name': {'type': 'LinkedServiceReference',
  'reference_name': 'sqlLinkedService'},
 'type': 'AzureSqlTable',
 'table_name': 'automatedcsv'}

In [11]:
# Create a copy activity
act_name = 'blobToSQL'
blob_source = BlobSource()
blob_sink = SqlSink()
dsin_ref = DatasetReference(ds_name)
dsOut_ref = DatasetReference(dsOut_name)
copy_activity = CopyActivity(act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)

In [12]:
#Create a pipeline with the copy activity
p_name = 'yashTestPipeLine'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)

	Name: yashTestPipeLine
	Id: /subscriptions/16be532f-d207-4d70-966d-c6ba9ad7d410/resourceGroups/DF-BlobAutomation/providers/Microsoft.DataFactory/factories/automated-df/pipelines/yashTestPipeLine
