# Script to import Data Warehouse objects in the Fabric Workspace

**Parameter Block**

Update the parameters before executing the notebook

In [7]:
#Parameters
workspace_name = 'Deployment_DWA_Test' #Add your workspace name to import DWA
deploy_aw = True #Set this flag to True if you want to deploy AW 


StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 9, Finished, Available, Finished)

In [8]:
import struct
import sqlalchemy
from sqlalchemy.sql import text
from notebookutils import mssparkutils
import sempy.fabric as fabric
import base64
from azure.core.credentials import AccessToken
from azure.storage.filedatalake import DataLakeServiceClient
from azure.identity import DefaultAzureCredential
import os
import pyodbc
import shutil
from git import Repo
import requests
import json

StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 10, Finished, Available, Finished)

**Code block to create a Lakehouse**

In [9]:
workspace_id = spark.conf.get("trident.workspace.id")
base_url = "https://api.fabric.microsoft.com/v1"
access_token = mssparkutils.credentials.getToken('pbi')

headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

#Create Lakehouse
lakehouse_url = f"{base_url}/workspaces/{workspace_id}/lakehouses"
payload = {
    "displayName": "LH",
    "description": "A schema-enabled lakehouse.",
    "creationPayload": {"enableSchemas": True}
}
response = requests.post(lakehouse_url, headers=headers, json=payload)

if response.status_code == 201:
    print(f"Lakehouse {payload['displayName']} created successfully!")
    print(response.json())
elif response.status_code == 400:
    print(f"Lakehouse {payload['displayName']} is already in use")
else:
    print("Failed to create Lakehouse: ")
    raise RuntimeError(response.status_code, response.text)

lakehouse_name = payload["displayName"]

StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 11, Finished, Available, Finished)

Lakehouse LH is already in use


 **Code block to create metadata based SQL DB**

In [10]:
payload = {
    "displayName": "Meta",
    "type": "SQLDatabase",
    "description": "Created using Python in Microsoft Fabric"
}

sqldb_url = f'{base_url}/workspaces/{workspace_id}/items'
response = requests.post(sqldb_url, headers=headers, json=payload)

if response.status_code == 400:
    print(f"SQL DB {payload['displayName']} is already in use")
else:
    time.sleep(300)                    # Wait for database creation
    if response.status_code == 201:
        print(f"Database {body['displayName']} created successfully:")
        print(response.json())
    else:
        print(f"Failed to create database: ", response.status_code, response.text)

StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 12, Finished, Available, Finished)

SQL DB Meta is already in use


**Code block to create Data Warehouse**

In [11]:
payload = {
    "displayName": "DW",
    "type": "warehouse",
    "properties": {
        "collation": "Latin1_General_100_CI_AS_KS_WS_SC_UTF8"
    },
    "description": "Created using Python in Microsoft Fabric"
}

sqldb_url = f'{base_url}/workspaces/{workspace_id}/items'
response = requests.post(sqldb_url, headers=headers, json=payload)

if response.status_code == 400:
    print(f"Warehouse {payload['displayName']} is already in use")
else:
    time.sleep(300)                    # Wait for database creation
    if response.status_code == 201:
        print(f"Warehouse {body['displayName']} created successfully:")
        print(response.json())
    else:
        print(f"Failed to create warehouse: ", response.status_code, response.text)

StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 13, Finished, Available, Finished)

Warehouse DW is already in use


**Code Block to copy Adventure Works Files into Lakehouse Files/Landing/aw when deploy_aw is set to TRUE**

In [12]:
if deploy_aw == False:
    mssparkutils.notebook.exit(1)
else:
    class CustomTokenCredential:
        def __init__(self, token):
            self.token = token

        def get_token(self, *scopes, **kwargs):
            return AccessToken(self.token, expires_on=9999999999)  # Set a far future expiration

    credential = CustomTokenCredential(notebookutils.credentials.getToken('storage'))
    service_client = DataLakeServiceClient(account_url=f"https://onelake.dfs.fabric.microsoft.com", credential=credential)
    fs = service_client.get_file_system_client(workspace_name)

    lh_paths = {
        "tmp/landing/": f"{lakehouse_name}.Lakehouse/Files/landing",
        "tmp/Tables/": f"{lakehouse_name}.Lakehouse/Tables"
    }

    def clone_and_unpack(repo_url, repo_dir, backup_dir, archives):
        if os.path.exists(repo_dir):
            print("Repo already cloned.")
        Repo.clone_from(repo_url, repo_dir)
        os.chdir(repo_dir)
        for archive, target in archives.items():
            shutil.unpack_archive(os.path.join(backup_dir, archive), target, "zip")

    def upload_files(local_path, azure_path):
        for root, _, files in os.walk(local_path):
            for file in files:
                file_path_on_local = os.path.join(root, file)
                relative_path = os.path.relpath(root, local_path)
                file_path_on_azure = os.path.join(azure_path, relative_path, file).replace("\\", "/")
                file_client = fs.get_file_client(file_path_on_azure)
                with open(file_path_on_local, "rb") as data:
                    file_client.upload_data(data, overwrite=True)

    repo_url = "https://github.com/ProdataSQL/DWA"
    repo_dir = "DWA_repo"
    git_lh_directory = "Backup/Files/Lakehouse"
    archives = {
        'AW_landing.zip': "tmp/landing",
        'AW_tables.zip': "tmp/Tables"
    }

    clone_and_unpack(repo_url, repo_dir, git_lh_directory, archives)

    for local, azure in lh_paths.items():
        upload_files(local, azure)


StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 14, Finished, Available, Finished)

**Code to Import Notebooks**

In [13]:
github_api_url = "https://api.github.com/repos/ProdataSQL/DWA/contents/Workspaces/DWA"
github_raw_base = "https://raw.githubusercontent.com/ProdataSQL/DWA/main"
notebook_url = f"{base_url}/workspaces/{workspace_id}/notebooks"

# Fetch directory contents from GitHub
response = requests.get(github_api_url)
if response.status_code == 200:
    items = response.json()
    for item in items:
        # Identify directories ending with .Notebook
        if item['type'] == 'dir' and item['name'].endswith('.Notebook'):
            notebook_name = item['name']
            contents_url = f"{github_raw_base}/Workspaces/DWA/{notebook_name}/notebook-content.py"
            contents_response = requests.get(contents_url)
            if contents_response.status_code == 200:
                # Convert contents.py to Jupyter Notebook format
                py_content = contents_response.text
                notebook_content = {
                    "nbformat": 4,
                    "nbformat_minor": 5,
                    "cells": [
                        {
                            "cell_type": "code",
                            "source": [py_content],
                            "execution_count": None,
                            "outputs": []
                        }
                    ],
                    "metadata": {
                        "language_info": {
                            "name": "python"
                        }
                    }
                }
                # Encode notebook content in Base64
                notebook_json = json.dumps(notebook_content)
                notebook_base64 = base64.b64encode(notebook_json.encode('utf-8')).decode('utf-8')
                
                # Prepare payload for Fabric API
                payload = {
                    "displayName": notebook_name,
                    "description": f"Imported notebook {notebook_name}",
                    "definition": {
                        "format": "ipynb",
                        "parts": [
                            {
                                "path": "artifact.content.ipynb",
                                "payload": notebook_base64,
                                "payloadType": "InlineBase64"
                            }
                        ]
                    }
                }
                
                # Upload notebook to Microsoft Fabric
                fabric_response = requests.post(
                    notebook_url.format(workspace_id=workspace_id),
                    headers=headers,
                    data=json.dumps(payload)
                )
                
                if fabric_response.status_code in [200, 201, 202]:
                    print(f"Successfully uploaded: {notebook_name}")
                else:
                    print(f"Failed to upload {notebook_name}: {fabric_response.status_code} - {fabric_response.text}")
            else:
                raise RuntimeError(f"Failed to download contents.py for {notebook_name}: {contents_response.status_code}")
else:
    raise RuntimeError(f"Failed to fetch GitHub directory contents: {response.status_code}, {response.text}")


StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 15, Finished, Available, Finished)

Failed to upload Demo - 10 Mins.Notebook: 400 - {"requestId":"a859b8ae-1aa5-47cb-88d8-195b067469b3","errorCode":"ItemDisplayNameAlreadyInUse","message":"Requested 'Demo - 10 Mins.Notebook' is already in use"}
Failed to upload Extract-CSV-Pandas.Notebook: 400 - {"requestId":"5ce30ee9-66b2-4a0a-bbc0-ee4d9f956dfe","errorCode":"ItemDisplayNameAlreadyInUse","message":"Requested 'Extract-CSV-Pandas.Notebook' is already in use"}
Failed to upload Extract-CSV.Notebook: 400 - {"requestId":"24dd5354-4e3c-4a6b-b95b-161122a395b4","errorCode":"ItemDisplayNameAlreadyInUse","message":"Requested 'Extract-CSV.Notebook' is already in use"}
Failed to upload Extract-Dictionary.Notebook: 400 - {"requestId":"a9a667d5-0a15-464b-907f-5e7980b03b8a","errorCode":"ItemDisplayNameAlreadyInUse","message":"Requested 'Extract-Dictionary.Notebook' is already in use"}
Failed to upload Extract-Fabric-Logs.Notebook: 400 - {"requestId":"8c900dff-1f01-462c-ac2c-792251a31e45","errorCode":"ItemDisplayNameAlreadyInUse","messag

**Deploy Meta DB and Warehouse SQL Objects**

In [25]:
def get_connection_string(workspace_id: str, db_type: str, display_name: str) -> str:
    client = fabric.FabricRestClient()
    endpoint = f"/v1/workspaces/{workspace_id}/{db_type}"
    databases = client.get(endpoint).json()
    database = next((db for db in databases.get("value", []) if db.get("displayName") == display_name), None)
    
    if not database:
        raise ValueError(f"No {db_type} with displayName '{display_name}' found.")
    
    server = database['properties'].get('serverFqdn') or database['properties'].get('connectionString')
    if db_type == "Warehouses":
        database = display_name
    else: 
        database = database['properties']['databaseName']
    return f"Driver={{ODBC Driver 18 for SQL Server}};Server={server};database={database};LongAsMax=YES"

def create_engine(connection_string: str):
    token = mssparkutils.credentials.getToken('https://analysis.windows.net/powerbi/api').encode("UTF-16-LE")
    token_struct = struct.pack(f'<I{len(token)}s', len(token), token)
    return sqlalchemy.create_engine("mssql+pyodbc://", creator=lambda: pyodbc.connect(connection_string, attrs_before={1256: token_struct}))

def execute_sql_script(engine, script):
    statements = script.split("\nGO\n")
    check_objects_query = "SELECT COUNT(*) AS TableCount FROM sys.tables; "
    
    with engine.connect() as conn:
        result = conn.execute(text(check_objects_query))
        rows = [dict(row) for row in result.mappings()]

        if rows[0]['TableCount'] == 0:
            try:
                for statement in statements:
                    statement = statement.strip()
                    if statement:
                        conn.execute(text(statement))
                        conn.commit()
            except:
                conn.rollback()
                raise
        else:
            print(f"Objects already exist")

def process_sql_script(workspace_id: str, db_type: str, display_name: str, script_path: str):
    connection_string = get_connection_string(workspace_id, db_type, display_name)
    engine = create_engine(connection_string)
    
    with open(script_path, "r", encoding="utf-16") as file:
        script = file.read()
    
    execute_sql_script(engine, script)

# Process Meta SQL Database
process_sql_script(workspace_id, "SQLDatabases", "Meta", "Backup/Files/SQLDB/SQLDB_Script.sql")

# Process Data Warehouse
process_sql_script(workspace_id, "Warehouses", "DW", "Backup/Files/DataWarehouse/DW_script.sql")


StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, 27, Finished, Available, Finished)

Engine(mssql+pyodbc://)
Objects already exist
Engine(mssql+pyodbc://)
Objects already exist


**Code to import Data Pipelines**

In [None]:
#pipelines_url = f"{base_url}/workspaces/{workspace_id}/dataPipelines"
#
## Fetch GitHub directory contents
#response = requests.get(github_api_url)
#
#if response.status_code == 200:
#    for item in response.json():
#        # Identify directories ending with .DataPipeline
#        if item.get('type') == 'dir' and item.get('name', '').endswith('.DataPipeline'):
#            pipeline_name = item['name']                                  
#            file_url = f"{github_raw_base}{pipeline_name}/pipeline-content.json"
#            r = requests.get(file_url)
#            if r.status_code != 200:
#                print(f"Failed to download {pipeline_name}: {r.status_code}")
#                continue
#            try:
#                pipeline_def = r.json()  # The JSON definition of your data pipeline
#            except Exception as e:
#                print(f"Error parsing JSON for {pipeline_name}: {e}")
#                continue
#
#            # Encode pipeline content in Base64
#            pipeline_b64 = base64.b64encode(json.dumps(pipeline_def).encode()).decode()
#            
#            # Prepare payload for Fabric DataPipeline API
#            payload = {
#                "displayName": pipeline_name,
#                "description": f"Imported data pipeline {pipeline_name}",
#                "definition": {
#                    "format": "json",
#                    "parts": [{
#                        "path": "artifact.content.json",
#                        "payload": pipeline_b64,
#                        "payloadType": "InlineBase64"
#                    }]
#                }
#            }
#
#            # Check if the pipeline already exists
#            list_response = requests.get(pipelines_url, headers=headers)
#            existing = None
#            if list_response.status_code == 200:
#                pipelines = list_response.json()
#                if isinstance(pipelines, dict) and "value" in pipelines:
#                    pipelines = pipelines["value"]
#                for pl in pipelines:
#                    if isinstance(pl, dict) and pl.get('displayName') == pipeline_name:
#                        existing = pl
#                        break
#
#            if existing:
#                update_url = f"{base_url}/workspaces/{workspace_id}/dataPipelines/{existing['id']}"
#                upd_response = requests.put(update_url, headers=headers, data=json.dumps(payload))
#                if upd_response.status_code in [200, 201, 202]:
#                    print(f"Updated: {pipeline_name}")
#                else:
#                    print(f"Failed to update {pipeline_name}: {upd_response.status_code} - {upd_response.text}")
#            else:
#                post_response = requests.post(pipelines_url, headers=headers, data=json.dumps(payload))
#                if post_response.status_code in [200, 201, 202]:
#                    print(f"Uploaded: {pipeline_name}")
#                else:
#                    print(f"Failed to upload {pipeline_name}: {post_response.status_code} - {post_response.text}")
#else:
#    raise RuntimeError(f"Failed to fetch GitHub directory contents: {response.status_code}, {response.text}")

StatementMeta(, 8c28822d-cef4-44ad-8d1e-adb8ba4fe942, -1, Cancelled, , Cancelled)