In [2]:
import requests
import time
import json
import base64
from string import Template 

# Fill in Collibra Connection info
COLLIBRA_API_URL = 'https://<instanceURL>/rest/2.0'
USERNAME = '<your username>'

# Base64 Encoded password decoded to a UTF8 string
PASSWORD = base64.b64decode('<yourEncodedPassword>'.encode('utf-8')).decode('utf-8')

# JSON Template File Locations
DOMAIN_JSON_FILE = '<specify the location of your Domain Template>'
ASSET_JSON_FILE = '<specify the location of your Asset Template>'
COMPLEXREL_JSON_FILE = '<specify the location of your Complex Relation Template>'

# Gets created/overwritten each run
DYNAMIC_JSON_FILE = '<specify the location of your Dynamic JSON File>'

# Duration in seconds before polling job status again.
WAIT_DURATION = 2

In [3]:
# Read the 3 JSON templates into variables
with open(DOMAIN_JSON_FILE, 'r') as content_file:
    domainTemplate = Template(content_file.read())

with open(ASSET_JSON_FILE, 'r') as content_file:
    assetTemplate = Template(content_file.read())
    
with open(COMPLEXREL_JSON_FILE, 'r') as content_file:
    complexRelTemplate = Template(content_file.read())

# Create Domain Function
def create_domain(domainName, domainType, communityName):
    json = domainTemplate.substitute(insertyourDomainNameVariable=domainName, insertyourDomainTypeVariable=domainType, insertyourCommunityNameVariable=communityName)
    return json

# Create Asset Function
def create_asset(assetName, assetType, domainName, communityName):
    json = assetTemplate.substitute(insertyourAssetNameVariable=assetName, insertyourAssetTypeVariable=assetType, 
                                         insertyourDomainNameVariable=domainName, insertyourCommunityNameVariable=communityName)
    return json

# Create ComplexRel Function
def create_complexRel(assetName1, assetName2, 
                      assetName3, domainName1, domainName2, communityName, transformationLogic):
    json = complexRelTemplate.substitute(insertyourAsset1NameVariable=assetName1, insertyourAsset2NameVariable=assetName2, 
                      insertyourAsset3NameVariable=assetName3, insertyourDomain1NameVariable=domainName1, 
                     insertyourDomain2NameVariable=domainName2, insertyourCommunity1NameVariable=communityName, insertyourTransformationLogicVariable=transformationLogic)

    return json

In [None]:
# Build the Import JSON Request
print('********************* Building Dynamic JSON Template *********************')
jsonList = [create_domain('<your_initials>_CRM', 'Data Asset Domain','Intermediate APIs')]
jsonList.append(create_domain('<your_initials>_Source to Target Mappings', 'Mapping Domain','Intermediate APIs'))
jsonList.append(create_asset('Client Name', 'Data Element','<your_initials>_CRM', 'Intermediate APIs'))
jsonList.append(create_asset('Client ID','Data Element','<your_initials>_CRM', 'Intermediate APIs'))
jsonList.append(create_asset('SQL Server Mapping', 'Mapping Specification', '<your_initials>_Source to Target Mappings', 'Intermediate APIs'))
jsonList.append(create_complexRel('Client Name', 'Client ID', 'SQL Server Mapping', 
                                    '<your_initials>_CRM', '<your_initials>_Source to Target Mappings', 'Intermediate APIs',
                                   '`clientsWithType` AS select `client`.`client_id` AS `id`, concat( concat( `client`.`name`, ' ' ), `client`.`surname` ) AS `fullname`, `client_type`.`value` AS `client_type`, `client`.`email_address` AS `email_address` from ( `client` join `client_type` on (( `client`.`client_type` = `client_type`.`code` )));'))

# Add the outer list brackets
jsonData = json.loads('[' + ','.join(jsonList) + ']')
jsonFormatted = json.dumps(jsonData,  indent=4)
print(jsonFormatted)

dynamicJSONfile = open(DYNAMIC_JSON_FILE, "w")
n = dynamicJSONfile.write(jsonFormatted)
dynamicJSONfile.close()

files = {'file': open(DYNAMIC_JSON_FILE, 'rb')}



In [None]:
# Import the JSON data
# If there is a hand-shake error after running this cell, comment out the files=files line and use the last line instead
# Then rerun all the cells in this application
print('********************* Starting Import *********************')

response = requests.post(
        COLLIBRA_API_URL + '/import/json-job', 
        auth=(USERNAME, PASSWORD),
        files=files
       #files=files, verify=False
    )
response.text

In [8]:
# Read the returned job ID and state
jobId = response.json()['id']
state = response.json()['state']


In [None]:
# Wait for the import job to finish
if state == 'WAITING':
    
    print('Waiting for import job to finish: ' + jobId)
    
    while state != 'COMPLETED' and state !='ERROR':
        response = requests.get(
            COLLIBRA_API_URL + '/jobs/' + jobId, 
            auth=(USERNAME, PASSWORD)
        )
        state = response.json()['state']
        print('Current job state: ' + state)
        time.sleep(WAIT_DURATION)
   
    print('Finished import job: ' + jobId)
    print('********************* ALL DONE *********************') 

else:
    raise ValueError('Collibra import job did not start correctly.')
