### 000_spn_create_fabric_items

#### Creates Microsoft Fabric workspace items (Lakehouse, Notebook, Warehouse, Pipeline, Dataflow Gen2) via REST API using Service Principal authentication.
##### Avoids pesky object staleness issues associated with users leaving the organization / not logging in for a while!

Prerequisites:
 - Service Principal with Fabric API permissions
 - SPN added as Workspace Admin (or Contributor) in target workspace
 - Key Vault access (if using KV-based credential retrieval, recommended)

Usage:
1. Configure credentials in CONFIGURATION section (Key Vault OR manual)
2. Run all cells through "Core Functions"
3. Uncomment/call desired create_* functions in "Execution" section

In [None]:
import requests
from typing import Optional
import notebookutils.credentials as cred

### Configuration

In [None]:
# Choose ONE authentication method: Key Vault (recommended) OR Manual

# -----------------------------------------------------------------------------
# Option A: Key Vault-based credentials (RECOMMENDED for production)
# -----------------------------------------------------------------------------
# Uncomment and configure the following block to use Key Vault:
#

# KEY_VAULT_NAME = "your-keyvault-name"  # e.g., "kv-fabric-prod"
# KEY_VAULT_URL = f"https://{KEY_VAULT_NAME}.vault.azure.net/"
#
# TENANT_ID = cred.getSecret(KEY_VAULT_URL, "aad-tenant-id")
# SP_CLIENT_ID = cred.getSecret(KEY_VAULT_URL, "fabric-spn-client-id")
# SP_CLIENT_SECRET = cred.getSecret(KEY_VAULT_URL, "fabric-spn-client-secret")

# -----------------------------------------------------------------------------
# Option B: Manual credentials (for development/testing ONLY)
# -----------------------------------------------------------------------------
# WARNING: Never commit credentials to source control!
TENANT_ID = ""          # Azure AD Tenant ID
SP_CLIENT_ID = ""       # Service Principal Application (Client) ID
SP_CLIENT_SECRET = ""   # Service Principal Client Secret

# -----------------------------------------------------------------------------
# Workspace Configuration (auto-detected from runtime context)
# -----------------------------------------------------------------------------
_context = mssparkutils.runtime.context
WORKSPACE_ID = _context["currentWorkspaceId"]
print(f"Target Workspace ID: {WORKSPACE_ID}")

# -----------------------------------------------------------------------------
# API Configuration
# -----------------------------------------------------------------------------
FABRIC_API_BASE_URL = "https://api.fabric.microsoft.com/v1"
OAUTH_TOKEN_URL_TEMPLATE = "https://login.microsoftonline.com/{tenant_id}/oauth2/token"
FABRIC_RESOURCE_URI = "https://api.fabric.microsoft.com"

### Core Functions

In [None]:
# =============================================================================
# AUTHENTICATION
# =============================================================================
def get_access_token(
    tenant_id: str,
    client_id: str,
    client_secret: str
) -> str:
    """
    Acquire OAuth2 access token from Microsoft Entra ID using client credentials flow.
    
    This function authenticates the Service Principal against Microsoft's OAuth2
    token endpoint and returns a bearer token for Fabric API calls.
    
    Args:
        tenant_id: Azure AD tenant identifier (GUID)
        client_id: Service Principal application (client) ID
        client_secret: Service Principal client secret value
    
    Returns:
        str: Bearer access token for Fabric API authentication
    
    Raises:
        requests.HTTPError: If authentication fails (invalid credentials, 
                           insufficient permissions, etc.)
        KeyError: If response doesn't contain expected 'access_token' field
    
    Example:
        >>> token = get_access_token(TENANT_ID, SP_CLIENT_ID, SP_CLIENT_SECRET)
        >>> print(f"Token acquired: {token[:20]}...")
    """
    token_url = OAUTH_TOKEN_URL_TEMPLATE.format(tenant_id=tenant_id)
    
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    payload = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret,
        "resource": FABRIC_RESOURCE_URI
    }
    
    response = requests.post(token_url, data=payload, headers=headers)
    response.raise_for_status()
    
    token_data = response.json()
    access_token = token_data["access_token"]
    
    return access_token


# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def _get_auth_headers(access_token: str) -> dict:
    """
    Construct standard authorization headers for Fabric API requests.
    
    Args:
        access_token: Valid OAuth2 bearer token
    
    Returns:
        dict: Headers dictionary with Authorization and Content-Type
    """
    return {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }


def _make_fabric_request(
    access_token: str,
    endpoint_path: str,
    payload: dict,
    item_type: str,
    item_name: str
) -> dict:
    """
    Execute a POST request to the Fabric REST API with standardized error handling.
    
    Args:
        access_token: Valid OAuth2 bearer token
        endpoint_path: API endpoint path (appended to FABRIC_API_BASE_URL)
        payload: JSON request body
        item_type: Human-readable item type for logging (e.g., "Lakehouse")
        item_name: Display name of item being created
    
    Returns:
        dict: API response JSON (empty dict if 202 Accepted with no body)
    
    Raises:
        requests.HTTPError: If API returns error status code
    """
    url = f"{FABRIC_API_BASE_URL}{endpoint_path}"
    headers = _get_auth_headers(access_token)
    
    response = requests.post(url, headers=headers, json=payload)
    
    # 201 = Created, 202 = Accepted (long-running provisioning)
    response.raise_for_status()
    
    status_msg = "created" if response.status_code == 201 else "provisioning started"
    print(f"[SUCCESS] {item_type} '{item_name}' {status_msg} (HTTP {response.status_code})")
    
    # Some endpoints return empty body on 202
    if response.content:
        return response.json()
    return {}


# =============================================================================
# CORE FUNCTIONS - FABRIC ITEM CREATION
# =============================================================================
def create_lakehouse(
    access_token: str,
    lakehouse_name: str,
    enable_schemas: bool = False
) -> dict:
    """
    Create a Microsoft Fabric Lakehouse in the current workspace.
    
    Lakehouses combine data lake storage with relational database capabilities.
    Schema-enabled lakehouses support database schemas for organizing tables.
    
    Args:
        access_token: OAuth2 bearer token from Entra ID
        lakehouse_name: Display name for the Lakehouse (must be unique in workspace)
        enable_schemas: When True, provisions a schema-enabled Lakehouse (preview).
                       Schema-enabled lakehouses support CREATE SCHEMA DDL.
    
    Returns:
        dict: API response containing created item metadata
    
    Raises:
        requests.HTTPError: If creation fails (duplicate name, permissions, etc.)
    
    Example:
        >>> create_lakehouse(token, "lh_bronze", enable_schemas=True)
        [SUCCESS] Lakehouse 'lh_bronze' created (HTTP 201)
    """
    endpoint = f"/workspaces/{WORKSPACE_ID}/lakehouses"
    
    payload = {
        "displayName": lakehouse_name
    }
    
    # Schema-enabled lakehouse (preview feature)
    if enable_schemas:
        payload["creationPayload"] = {"enableSchemas": True}
    
    return _make_fabric_request(
        access_token=access_token,
        endpoint_path=endpoint,
        payload=payload,
        item_type="Lakehouse",
        item_name=lakehouse_name
    )


def create_notebook(access_token: str, notebook_name: str) -> dict:
    """
    Create an empty Fabric Notebook in the current workspace.
    
    Creates a new PySpark notebook with default configuration. The notebook
    will be empty and ready for development.
    
    Args:
        access_token: OAuth2 bearer token from Entra ID
        notebook_name: Display name for the Notebook (must be unique in workspace)
    
    Returns:
        dict: API response containing created item metadata
    
    Raises:
        requests.HTTPError: If creation fails
    
    Example:
        >>> create_notebook(token, "nb_100_ingest_raw_data")
        [SUCCESS] Notebook 'nb_100_ingest_raw_data' created (HTTP 201)
    """
    endpoint = f"/workspaces/{WORKSPACE_ID}/notebooks"
    
    payload = {
        "displayName": notebook_name
    }
    
    return _make_fabric_request(
        access_token=access_token,
        endpoint_path=endpoint,
        payload=payload,
        item_type="Notebook",
        item_name=notebook_name
    )


def create_warehouse(access_token: str, warehouse_name: str) -> dict:
    """
    Create a Fabric Warehouse in the current workspace.
    
    Warehouses provide T-SQL-based analytics with automatic scaling.
    Note: Warehouse provisioning may take several minutes.
    
    Args:
        access_token: OAuth2 bearer token from Entra ID
        warehouse_name: Display name for the Warehouse (must be unique in workspace)
    
    Returns:
        dict: API response containing created item metadata
    
    Raises:
        requests.HTTPError: If creation fails
    
    Example:
        >>> create_warehouse(token, "wh_gold")
        [SUCCESS] Warehouse 'wh_gold' provisioning started (HTTP 202)
    """
    endpoint = f"/workspaces/{WORKSPACE_ID}/warehouses"
    
    payload = {
        "displayName": warehouse_name
    }
    
    return _make_fabric_request(
        access_token=access_token,
        endpoint_path=endpoint,
        payload=payload,
        item_type="Warehouse",
        item_name=warehouse_name
    )


def create_pipeline(access_token: str, pipeline_name: str) -> dict:
    """
    Create a Fabric Data Pipeline in the current workspace.
    
    Data Pipelines orchestrate data movement and transformation activities.
    The created pipeline will be empty and ready for activity configuration.
    
    Args:
        access_token: OAuth2 bearer token from Entra ID
        pipeline_name: Display name for the Pipeline (must be unique in workspace)
    
    Returns:
        dict: API response containing created item metadata
    
    Raises:
        requests.HTTPError: If creation fails
    
    Example:
        >>> create_pipeline(token, "pl_100_daily_refresh")
        [SUCCESS] Pipeline 'pl_100_daily_refresh' created (HTTP 201)
    """
    endpoint = f"/workspaces/{WORKSPACE_ID}/dataPipelines"
    
    payload = {
        "displayName": pipeline_name
    }
    
    return _make_fabric_request(
        access_token=access_token,
        endpoint_path=endpoint,
        payload=payload,
        item_type="Pipeline",
        item_name=pipeline_name
    )


def create_dataflow_gen2(access_token: str, dataflow_name: str) -> dict:
    """
    Create a Fabric Dataflow Gen2 in the current workspace.
    
    Dataflow Gen2 provides Power Query-based data transformation with 
    enhanced performance and Fabric integration.
    
    Args:
        access_token: OAuth2 bearer token from Entra ID
        dataflow_name: Display name for the Dataflow (must be unique in workspace)
    
    Returns:
        dict: API response containing created item metadata
    
    Raises:
        requests.HTTPError: If creation fails
    
    Example:
        >>> create_dataflow_gen2(token, "df_transform_customers")
        [SUCCESS] Dataflow Gen2 'df_transform_customers' created (HTTP 201)
    """
    endpoint = f"/workspaces/{WORKSPACE_ID}/dataflows"
    
    payload = {
        "displayName": dataflow_name
    }
    
    return _make_fabric_request(
        access_token=access_token,
        endpoint_path=endpoint,
        payload=payload,
        item_type="Dataflow Gen2",
        item_name=dataflow_name
    )




In [None]:
# Acquire access token (required before creating any items)
access_token = get_access_token(TENANT_ID, SP_CLIENT_ID, SP_CLIENT_SECRET)
print("Access token acquired successfully.\n")



### Execution

In [None]:
# -----------------------------------------------------------------------------
# Create Fabric items by uncommenting desired lines below
# -----------------------------------------------------------------------------

# Lakehouses (set enable_schemas=True for schema-enabled lakehouse)
# create_lakehouse(access_token, "lh_bronze", enable_schemas=True)
# create_lakehouse(access_token, "lh_silver", enable_schemas=True)
# create_lakehouse(access_token, "lh_gold", enable_schemas=True)

# Notebooks
# create_notebook(access_token, "notebook_name")

# Warehouses
# create_warehouse(access_token, "wh_name")

# Pipelines
# create_pipeline(access_token, "pl_name")

# Dataflows
# create_dataflow_gen2(access_token, "df__name")