In [None]:
import requests, json, logging

logger = logging.getLogger(__name__)

class fabric_rest:
    def __init__(self, audience:str='pbi'):
        self.header = self.create_header(audience)


    def create_header(self, audience:str='pbi') -> dict:
        return {'Authorization': f'Bearer {mssparkutils.credentials.getToken(audience)}', 'Content-type': 'application/json'}


    def call_rest(self, method:str, url:str, body:dict=None) -> requests.Response():
        try:
            response = requests.request(method=method, url=url, headers=self.header, data=json.dumps(body))
            response.raise_for_status()
            logger.info(f"Success - {response}")
            return response
        except Exception as e:
            print(f"{e} - {response.text}")


    # https://learn.microsoft.com/en-us/rest/api/fabric/admin/workspaces/list-workspaces?tabs=HTTP
    def get_workspace_id(self, workspaceName:str) -> str:
        response = self.call_rest(method='get', url='https://api.fabric.microsoft.com/v1/workspaces')
        workspaceId = [workspace.get('id') for workspace in response.json().get('value') if workspace.get('displayName') == workspaceName][0]
        return workspaceId


    # https://learn.microsoft.com/en-us/rest/api/fabric/admin/items/list-items?tabs=HTTP
    def get_pipeline_id(self, workspaceName:str, pipelineName:str) -> str:
        workspaceId = self.get_workspace_id(workspaceName=workspaceName)
        response = self.call_rest(method='get', url=f'https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items?type=DataPipeline')
        pipelineId = [pipeline.get('id') for pipeline in response.json().get('value') if pipeline.get('displayName') == pipelineName][0]
        return pipelineId


    # https://learn.microsoft.com/en-us/rest/api/fabric/core/items/get-item-definition?tabs=HTTP
    def get_item_definition_parts(self, workspaceName:str, pipelineName:str) -> str:
        workspaceId = self.get_workspace_id(workspaceName=workspaceName)
        pipelineId = self.get_pipeline_id(workspaceName=workspaceName, pipelineName=pipelineName)
        response = self.call_rest(method='post', url=f'https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{pipelineId}/getDefinition')
        itemDefinitionParts = response.json().get('definition').get('parts')
        return itemDefinitionParts


    # https://learn.microsoft.com/en-us/rest/api/fabric/core/items/create-item?tabs=HTTP
    def clone_pipeline(self, workspaceNameSource:str, pipelineNameSource:str, workspaceNameTarget:str, pipelineNameTarget:str) -> requests.Response():
        pipelinePartsList = self.get_item_definition_parts(workspaceName=workspaceNameSource, pipelineName=pipelineNameSource)

        body = {"displayName": pipelineNameTarget
                ,"type": "DataPipeline"
                ,"definition": {
                    "parts": pipelinePartsList
                    }
                }

        workspaceId = self.get_workspace_id(workspaceName=workspaceNameTarget)
        
        response = self.call_rest(method='post', url=f'https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items', body=body)
        return response


##
# Unit Tests
##

# Get workspace id
fabric_rest().get_workspace_id(workspaceName='WS_Steve')

# Get pipeline id
fabric_rest().get_pipeline_id(workspaceName='WS_Steve', pipelineName='PL_Simple')

# Get pipeline definition
fabric_rest().get_item_definition_parts(workspaceName='WS_Steve', pipelineName='PL_Simple')

# Clone an existing pipeline
fabric_rest().clone_pipeline(workspaceNameSource='WS_Steve', pipelineNameSource='PL_CreatedFromAPI', workspaceNameTarget='WS_Steve', pipelineNameTarget='PL_CreatedFromAPI_clone')