In [1]:
import requests
from json import loads, dumps
from typing import Callable, List, Dict, Optional, Any

StatementMeta(, 4dde59e7-50be-40da-ab35-5a337512cf2e, 3, Finished, Available, Finished)

In [1]:
# fabric API logic

def create_fabric_session(fabric_token: str):
    fabric_headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {fabric_token}'
    }
    fabric_session = requests.Session()
    fabric_session.headers.update(fabric_headers)
    return fabric_session
    

def assign_fabric_workspace_role(workspace_id: str, principal_id: str, principal_type: str, role: str, fabric_session):
    url = f"workspaces/{workspace_id}/roleAssignments"
    data = {
        "principal": {
            "id": principal_id,
            "type": principal_type
        },
        "role": role
    }
    return fabric_request(fabric_session, url, 'POST', data)
    

def handle_lro(session: requests.Session, response: requests.Response) -> requests.Response:
    """Handles long running operations.

	Args:
      session (requests.Session): The session object used for making HTTP requests.
      response (requests.Response): The initial response object that contains the operation GUID in its headers.
    Returns:
        requests.Response: The final response object that contains the result of the long running operation.
    Raises:
        RuntimeError: If an error occurs during the operation.

    This function continuously polls the operation status until it's no longer in the "NotStarted", "Running", or "Undefined" states.
    """
    operation_guid = response.headers['Location'].split('/')[-1]

    status = "NotStarted"
    while status in ("NotStarted","Running","Undefined"):
        lro_result = session.get(url=f"https://api.fabric.microsoft.com/v1/operations/{operation_guid}")
        status = lro_result.json().get("status")
        if lro_result.json().get("error"):
            print(lro_result.json().get("error"))
            break
        sleep(10)
    return session.get(url=f'https://api.fabric.microsoft.com/v1/operations/{operation_guid}/result')


def fabric_request(session: requests.Session, url: str, method: str, payload=None, payloadtype='data', wait_for_response = True) -> requests.Response:
    url = f'https://api.fabric.microsoft.com/v1/{url}'
    method = method.upper()
    payloadtype = payloadtype.lower()
    if method == 'GET':
        response = session.get(url=url)
    elif method == 'POST' and payloadtype == 'data':
        response = session.post(url=url, data=payload)
    elif method == 'POST' and payloadtype == 'json':
        response = session.post(url=url, json=payload)
    elif method == 'PUT' and payloadtype =='data':
        response = session.put(url=url, data=payload)
    elif method == 'PUT' and payloadtype =='json':
        response = session.post(url=url, json=payload)
    elif method == 'DELETE':
        response = session.delete(url=url)
    else:
        return 'invalid method (GET,POST,PUT,DELETE) or invalid payloadtype (data,json)'
    if response.status_code == 200:
        try:
            return response.json()
        except requests.JSONDecodeError as e:
            print(e)
            return {}
    elif response.status_code == 201:
        return response.json()
    elif response.status_code == 202:
        if wait_for_response:
            print('Request accepted: Waiting for Long Running Operation')
            return (handle_lro(session, response)).json()
        try:
            print('Request accepted: Don''t Wait for result')
            return response.text
        except requests.JSONDecodeError as e:
            print(e)
            return {}
    elif response.status_code == 429:
        if response.headers.get('Retry-After'):
            sleep(int(response.headers.get('Retry-After'))) # sleep for that long and start new response
        else:
            sleep(60)
        return fabric_request(session, url, method, payload, payloadtype)
    elif response.status_code >= 400:
        print(F"{response.status_code=}\n{response.reason=}\n{response.text=}\n{url=}\n{payload=}\n{payloadtype=}")
        try:
            if response.json() and response.json().get("errorCode"):
                print(F"{response.json().get('errorCode')}")
                return response.json()
        except requests.JSONDecodeError as e:
            print(e)

def get_fabric_workspaces(session: requests.Session):
    return fabric_request(session, url=f"workspaces/", method='GET')

def create_fabric_workspaces(session: requests.Session):
    return fabric_request(session, url=f"workspaces/", method='POST')

def create_fabric_connection(session: requests.Session):
    return fabric_request(session, url=f"connections/", method='POST')

def assign_fabric_workspaces_capacity(capacity_id: str, workspace_id: str, session: requests.Session):
    return fabric_request(session, 
        url=f"workspaces/{workspaceId}/assignToCapacity",  
        method='POST',
        payload= {'capacity_id': capacity_id},
        payloadtype='json')
   
def create_fabric_lakehouse(lakehouse_name: str, workspace_id: str, session: requests.Session):
    return fabric_request(session, 
        url= f"workspaces/{workspace_id}/items", 
        method='POST',
        payload= {'displayName': lakehouse_name,
                  "type": "Lakehouse"},
        payloadtype='json'
    )

def create_fabric_warehouse(target_workspace_id: str, warehouse_name: str, fabric_session: requests.session):
    return fabric_request(fabric_session,
            url=f"workspaces/{target_workspace_id}/warehouses",
            method='POST',
            payload={'displayName': warehouse_name},
            payloadtype='json')



def get_fabric_items(session: requests.Session, workspace_id, item_id = ''):
    if item_id != '':
        return fabric_request(session, url=f"workspaces/{workspace_id}/items/{item_id}/getDefinition", method='POST')
    return fabric_request(session, url=f"workspaces/{workspace_id}/items/{item_id}", method='GET')

def assign_fabric_workspace_roles(session: requests.Session, workspace_id, roles):
    existing_roles = fabric_request(session, url=f'workspaces/{workspace_id}/roleAssignments', method="GET")

    for role in roles:
        for existing_role in existing_roles['value']:
            if existing_role['principal']['displayName'] == role['principal']['displayName']:
                print(f"Skipping: {role['principal']['displayName']} for {workspace_id}")
                tasks.append({"task_name":f"Skipping: {role['principal']['displayName']} for {workspace_id}", "task_duration": int(time() - start), "status": "success"})
                break
        else:
            print(f"Authorize: {role['principal']['displayName']} as {role['role']} to {workspace_id}")
            tasks.append({"task_name":f"Authorize: {role['principal']['displayName']} as {role['role']} to {workspace_id}", "task_duration": int(time() - start), "status": "success"})
            return_item = fabric_request(session, url=f'workspaces/{workspace_id}/roleAssignments', method="POST", payload=role, payloadtype='json')
            continue

def assign_fabric_workspace_capacity (session : requests.Session, workspace_id, capacity_id):
    capacities = fabric_request(fabric_session, F"capacities", 'GET')

    for capacitie in capacities['value']:
        if capacitie['id'].lower() == capacity_id.lower():
            print(F"Capacity found ({capacity_id})")
            if capacitie['state'] != 'Active':
                print(F"Capacity not active ({capacity_id})")
            else:
                print(F"Capacity ({capacity_id}) assigning to workspace ({workspace_id})")
                fabric_request(fabric_session, F"workspaces/{workspace_id}/assignToCapacity/", 'POST', payload={"capacityId" : capacity_id}, payloadtype='json', wait_for_response=False)
            break
    else:
        print(F"Capacity not found ({capacity_id})")



StatementMeta(, 25389dc0-07b3-4435-9f21-9788b08862c5, 3, Finished, Available, Finished)

NameError: name 'requests' is not defined

In [None]:
def id_for_existing_items(name, type, existing_items):
    # Check in existing_items
    for it in existing_items:
        if name == it["displayName"] and type == it["type"]:
            return it["id"]
    
    return "New Item"

StatementMeta(, 25389dc0-07b3-4435-9f21-9788b08862c5, -1, Cancelled, , Cancelled)

In [None]:
def deploy_items(items_to_deploy, guids_to_replace, fabric_access_token, workspace_id, existing_items):

    header = {'Content-Type': 'application/json', 'Authorization': f'Bearer {fabric_access_token}'}

    url = f'https://api.fabric.microsoft.com/v1/connections/'
    response = requests.get(url=url, headers=header)
    connection_list = [item['id'] for item in response.json()['value']]

    for item in items_to_deploy:
        rename_item = {}
        rename_item["old_id"] = item["org_id"]
        print(f'Deploy {item["displayName"]} ({item["type"]})')
        start = time()
        if 'definition' in item:
            for i in range(len(item['definition']['parts'])):
                b = item['definition']['parts'][i]['payload']
                decoded = base64.b64decode(b).decode('utf-8')

                for repl in guids_to_replace:
                    decoded = decoded.replace(repl["old_id"], repl["new_id"])
                   
                if item["type"] == 'DataPipeline':
                    decoded = mark_activity_inactive_if_no_connection(decoded, connection_list)

                encoded = base64.b64encode(decoded.encode('utf-8'))
                item['definition']['parts'][i]['payload'] = encoded
        it = item

        existing_id = id_for_existing_items(item['displayName'], item['type'], existing_items)
        
        if existing_id == "New Item":
            print("Create")
            url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/'
            response = requests.post(url=url, headers=header, json=item)
            #response = fabric_request(fabric_session, url=f"workspaces/{workspace_id}/items/", method="POST", payload=item, payloadtype='data')
            tasks.append({"task_name":f"create item {item.get('displayName')} initially", "task_duration": int(time() - start), "status": "success"})
            #response=fabric_request(fabric_session, F"workspaces/{workspace_id}/items/", 'POST', payload=item, payloadtype='json')
        else:
            print(f"Update {existing_id}")
           # url = f'https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/items/{existing_id}/updateDefinition'
            url = f'https://api.fabric.microsoft.com/v1/workspaces/'+ workspace_id + '/items/' + existing_id + "/updateDefinition"
            response = requests.post(url=url, headers=header, json = item) 
            #response = fabric_request(fabric_session, url=f"workspaces/{workspace_id}/items/{existing_id}/updateDefinition'", method="POST", payload=item, payloadtype='data')
            tasks.append({"task_name":f"Update item  Definition {item.get('displayName')}", "task_duration": int(time() - start), "status": "success"})
            #response=fabric_request(fabric_session, F"workspaces/{workspace_id}/items/{existing_id}/updateDefinition", 'POST', payload=item, payloadtype='json')
        
    if response.status_code == 202:
            get_op = 'Running'
            while get_op != 'Succeeded' and get_op != 'Failed':
                time.sleep(1.5)
                    
                header = {'Content-Type':'application/json','Authorization': f'Bearer {fabric_access_token}'}
                response2 = requests.get(url=response.headers["location"], headers=header)
                get_op = response2.json()['status']
                print(get_op)

                header = {'Content-Type':'application/json','Authorization': f'Bearer {fabric_access_token}'}

                response3 = requests.get(url=response.headers["location"]+ "/result", headers=header)
                response3 = response3.json()
    else:
        if existing_id == "New Item":
            response3 = response.json()
    if existing_id == "New Item":
        rename_item["new_id"] = response3["id"]
    else:
        rename_item["new_id"] = existing_id

    guids_to_replace.append(rename_item)

    duration = int(time() - start)
    tasks.append({"task_name":f'Deploy {item["displayName"]}', "task_duration": duration, "status": "success"})

def mark_activity_inactive_if_no_connection(data: str, target_guids: List) -> str:
    if not data:
        print("no data")
        return "", []

    def find_externalReferences_in_dict(j: Dict) -> Dict:
        # recursive function to find all externalReferences in a dictionary
        externalReferences = {}
        for key, value in j.items():
            if isinstance(value, dict) and value:
                result = find_externalReferences_in_dict(value)
                if result:
                    externalReferences.update(result)
            if key == "externalReferences":
                externalReferences.update({key: value})
        if externalReferences:
            return externalReferences
    
    data = json.loads(data)
    pipeline_name = data.get('name') or data.get('displayName')
    
    if not data:
        return json.dumps(data)
    if not data.get("properties"):
        return json.dumps(data)
    if not data.get("properties").get("activities"):
        return json.dumps(data)
    for activity in data["properties"]["activities"]:
        result = find_externalReferences_in_dict(activity)
        if result and result.get('externalReferences') and result.get('externalReferences').get('connection') not in target_guids:
            print(f"Deactivate activity {activity.get('name')} for connection {result.get('externalReferences').get('connection')}")
            activity["state"] = "Inactive"
            activity["onInactiveMarkAs"] = "Succeeded"
        
        if activity.get("typeProperties").get("activities"):
            for sub_activity in activity["typeProperties"]["activities"]:
                result = find_externalReferences_in_dict(sub_activity)
                if result and result.get('externalReferences') and result.get('externalReferences').get('connection') not in target_guids:
                    print(f"Deactivate sub_activity {sub_activity.get('name')} for connection {result.get('externalReferences').get('connection')}")
                    sub_activity["state"] = "Inactive"
                    sub_activity["onInactiveMarkAs"] = "Succeeded"
        if activity.get("typeProperties").get("ifFalseActivities"):
            for false_activity in activity["typeProperties"]["ifFalseActivities"]:
                result = find_externalReferences_in_dict(false_activity)
                if result and result.get('externalReferences') and result.get('externalReferences').get('connection') not in target_guids:
                    print(f"Deactivate sub_activity {false_activity.get('name')} for connection {result.get('externalReferences').get('connection')}")
                    false_activity["state"] = "Inactive"
                    false_activity["onInactiveMarkAs"] = "Succeeded"
        if activity.get("typeProperties").get("ifTrueActivities"):
            for true_activity in activity["typeProperties"]["ifTrueActivities"]:
                result = find_externalReferences_in_dict(true_activity)
                if result and result.get('externalReferences') and result.get('externalReferences').get('connection') not in target_guids:
                    print(f"Deactivate sub_activity {true_activity.get('name')} for connection {result.get('externalReferences').get('connection')}")
                    true_activity["state"] = "Inactive"
                    true_activity["onInactiveMarkAs"] = "Succeeded"

        
    return json.dumps(data)

StatementMeta(, 25389dc0-07b3-4435-9f21-9788b08862c5, -1, Cancelled, , Cancelled)