# Azure Synapse Database Connections

This notebook demonstrates how to connect to various Azure Synapse database types:
- **Synapse Workspace** - Management and artifact operations
- **Synapse SQL Pools** - Dedicated and Serverless SQL databases
- **Synapse Spark/Lake Databases** - Delta Lake and Spark metastore
- **Linked Databases** - External linked services

## 1. Configuration

Set your Azure Synapse workspace details below:

In [None]:
# Configuration - Update these values for your environment
SYNAPSE_WORKSPACE_NAME = "your-synapse-workspace"
SYNAPSE_SQL_ENDPOINT = f"{SYNAPSE_WORKSPACE_NAME}.sql.azuresynapse.net"
SYNAPSE_DEV_ENDPOINT = f"https://{SYNAPSE_WORKSPACE_NAME}.dev.azuresynapse.net"
SUBSCRIPTION_ID = "your-subscription-id"
RESOURCE_GROUP = "your-resource-group"

# Database names
DEDICATED_SQL_POOL = "your_dedicated_pool"  # Dedicated SQL Pool name
SERVERLESS_DATABASE = "your_database"        # Serverless SQL database
LAKE_DATABASE = "your_lake_db"               # Lake database name

# Azure Data Lake Storage (for Lake databases)
ADLS_ACCOUNT = "your_storage_account"
ADLS_CONTAINER = "your_container"

In [None]:
# Configuration - Azure Government Cloud Synapse Workspace
SYNAPSE_WORKSPACE_NAME = "synapsedemo"
CLOUD_SUFFIX = "usgovcloudapi.net"

# Endpoints from Azure Portal
SYNAPSE_SQL_ENDPOINT = f"synapsedemo.sql.azuresynapse.{CLOUD_SUFFIX}"
SYNAPSE_SERVERLESS_ENDPOINT = f"synapsedemo-ondemand.sql.azuresynapse.{CLOUD_SUFFIX}"
SYNAPSE_DEV_ENDPOINT = f"https://synapsedemo.dev.azuresynapse.{CLOUD_SUFFIX}"
SYNAPSE_DEDICATED_ENDPOINT = f"synapsedemo.sql.azuresynapse.{CLOUD_SUFFIX}"

# Subscription and Resource Group
SUBSCRIPTION_ID = "6b71afbc-6cbe-4446-948c-3c45a4b4f160"
RESOURCE_GROUP = "synapse_demo"

# Dedicated SQL Pool
DEDICATED_SQL_POOL = "demo_dedicated_pool"

# Lake Databases (accessed via Serverless SQL)
LAKE_DATABASES = [
    "Database_all",
    "default", 
    "demo_lake_db",
    "staging_db",
]
LAKE_DATABASE = "demo_lake_db"  # Default lake database to connect to

# Serverless SQL database
SERVERLESS_DATABASE = "master"

# Azure Data Lake Storage (Primary ADLS Gen2 from workspace)
ADLS_ACCOUNT = "synapsedatalake"
ADLS_FILESYSTEM = "synapsefilesystem"
ADLS_ENDPOINT = f"https://{ADLS_ACCOUNT}.dfs.core.{CLOUD_SUFFIX}"

## 2. Authentication Setup

Using Azure Identity for authentication. This supports multiple authentication methods:
- Azure CLI (`az login`)
- Environment variables
- Managed Identity (when running in Azure)
- Interactive browser login

In [None]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential, AzureAuthorityHosts
import struct

# Use Azure Government Cloud authority
AZURE_GOV_AUTHORITY = AzureAuthorityHosts.AZURE_GOVERNMENT

# Use DefaultAzureCredential for flexible authentication with Azure Government
credential = DefaultAzureCredential(authority=AZURE_GOV_AUTHORITY)

# For interactive authentication (uncomment if needed):
# credential = InteractiveBrowserCredential(authority=AZURE_GOV_AUTHORITY)

# Azure Government SQL Database resource endpoint
SQL_RESOURCE = "https://database.usgovcloudapi.net/.default"

def get_sql_access_token():
    """Get an access token for Azure SQL Database authentication (Azure Government)."""
    token = credential.get_token(SQL_RESOURCE)
    # Format token for pyodbc
    token_bytes = token.token.encode("UTF-16-LE")
    token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes)
    return token_struct

print("✓ Credentials initialized for Azure Government Cloud")

## 3. Connect to Synapse Workspace

Access workspace artifacts like pipelines, datasets, linked services, and notebooks.

In [None]:
from azure.synapse.artifacts import ArtifactsClient

# Create the Artifacts client for workspace operations
artifacts_client = ArtifactsClient(
    credential=credential,
    endpoint=SYNAPSE_DEV_ENDPOINT
)

# List linked services in the workspace
print("Linked Services in workspace:")
print("-" * 40)
for ls in artifacts_client.linked_service.get_linked_services_by_workspace():
    print(f"  • {ls.name} ({ls.type})")

# List datasets
print("\nDatasets in workspace:")
print("-" * 40)
for ds in artifacts_client.dataset.get_datasets_by_workspace():
    print(f"  • {ds.name}")

## 4. Connect to Synapse SQL Databases

### 4.1 Serverless SQL Pool (Built-in)

Connect to the serverless SQL pool for on-demand queries.

In [None]:
import pyodbc
import pandas as pd

def get_synapse_connection(database: str, server: str = None) -> pyodbc.Connection:
    """
    Create a connection to Synapse SQL using Azure AD authentication.
    
    Args:
        database: The database name (use 'master' for serverless built-in)
        server: The SQL endpoint (defaults to SYNAPSE_SQL_ENDPOINT)
    
    Returns:
        pyodbc.Connection object
    """
    server = server or SYNAPSE_SQL_ENDPOINT
    
    # Connection string for Azure AD token authentication
    conn_str = (
        f"Driver={{ODBC Driver 18 for SQL Server}};"
        f"Server={server};"
        f"Database={database};"
        f"Encrypt=yes;"
        f"TrustServerCertificate=no;"
    )
    
    # Get access token and connect
    token = get_sql_access_token()
    conn = pyodbc.connect(conn_str, attrs_before={1256: token})
    
    return conn

# Connect to Serverless SQL Pool (built-in)
serverless_conn = get_synapse_connection(database="master")
print("✓ Connected to Serverless SQL Pool")

# List all databases
query = "SELECT name, database_id FROM sys.databases ORDER BY name"
df_databases = pd.read_sql(query, serverless_conn)
print("\nDatabases available:")
print(df_databases.to_string(index=False))

### 4.2 Dedicated SQL Pool

Connect to a provisioned dedicated SQL pool for data warehousing workloads.

In [None]:
# Connect to Dedicated SQL Pool
dedicated_conn = get_synapse_connection(database=DEDICATED_SQL_POOL)
print(f"✓ Connected to Dedicated SQL Pool: {DEDICATED_SQL_POOL}")

# List schemas and tables in the dedicated pool
query = """
SELECT 
    s.name AS schema_name,
    t.name AS table_name,
    p.rows AS row_count
FROM sys.tables t
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
INNER JOIN sys.partitions p ON t.object_id = p.object_id AND p.index_id IN (0, 1)
ORDER BY s.name, t.name
"""
df_tables = pd.read_sql(query, dedicated_conn)
print(f"\nTables in {DEDICATED_SQL_POOL}:")
print(df_tables.to_string(index=False))

## 5. Connect to Lake Databases

Lake databases in Synapse provide a metadata layer over data stored in Azure Data Lake Storage. You can query them via the serverless SQL pool.

In [None]:
# Connect to a Lake Database via Serverless SQL (on-demand endpoint)
# Lake databases MUST use the serverless/on-demand endpoint
lake_conn = get_synapse_connection(database=LAKE_DATABASE, server=SYNAPSE_SERVERLESS_ENDPOINT)
print(f"✓ Connected to Lake Database: {LAKE_DATABASE}")

# List tables in the Lake Database
query = """
SELECT 
    s.name AS schema_name,
    t.name AS table_name,
    t.type_desc
FROM sys.tables t
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
ORDER BY s.name, t.name
"""
df_lake_tables = pd.read_sql(query, lake_conn)
print(f"\nTables in Lake Database '{LAKE_DATABASE}':")
print(df_lake_tables.to_string(index=False))

In [None]:
# Query data directly from Azure Data Lake using OPENROWSET
# This works with Parquet, Delta, CSV, and JSON files

query_parquet = f"""
SELECT TOP 100 *
FROM OPENROWSET(
    BULK 'https://{ADLS_ACCOUNT}.dfs.core.windows.net/{ADLS_CONTAINER}/path/to/data/*.parquet',
    FORMAT = 'PARQUET'
) AS data
"""

# Query Delta Lake tables
query_delta = f"""
SELECT TOP 100 *
FROM OPENROWSET(
    BULK 'https://{ADLS_ACCOUNT}.dfs.core.windows.net/{ADLS_CONTAINER}/path/to/delta_table/',
    FORMAT = 'DELTA'
) AS data
"""

# Example: Run the parquet query (uncomment when paths are configured)
# df_parquet = pd.read_sql(query_parquet, serverless_conn)
# print(df_parquet.head())

print("✓ OPENROWSET queries ready - update paths and uncomment to execute")

## 6. Query Linked Databases

Access external databases through linked services configured in Synapse.

In [None]:
# List all linked services and their types
print("Linked Services available for external database access:")
print("-" * 60)

for ls in artifacts_client.linked_service.get_linked_services_by_workspace():
    ls_detail = artifacts_client.linked_service.get_linked_service(ls.name)
    ls_type = ls_detail.properties.type if hasattr(ls_detail.properties, 'type') else 'Unknown'
    print(f"  • {ls.name}")
    print(f"    Type: {ls_type}")
    print()

In [None]:
# Query external SQL Server or Azure SQL via linked service
# Using sp_execute_remote or three-part naming with external tables

# Example: Create an external data source (run once in SQL)
create_external_source = """
-- Run this in Synapse SQL to create external data source
CREATE EXTERNAL DATA SOURCE LinkedAzureSQL
WITH (
    TYPE = RDBMS,
    LOCATION = 'your-azure-sql-server.database.windows.net',
    DATABASE_NAME = 'your_database',
    CREDENTIAL = YourCredential
);
"""

# Example: Query via external table
query_external = """
-- Query external table (after creating external table definition)
SELECT TOP 100 * 
FROM [ExternalSchema].[ExternalTable]
"""

# Example: Using three-part naming for linked databases
query_linked = """
-- For databases linked via Synapse Link or external tables
SELECT *
FROM [LinkedDatabase].[schema].[table]
"""

print("✓ External query templates ready")
print("  Configure external data sources and credentials in Synapse Studio")

## 7. Using SQLAlchemy for ORM-style Access

SQLAlchemy provides a more Pythonic interface for database operations.

In [None]:
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL

def create_synapse_engine(database: str, server: str = None):
    """
    Create a SQLAlchemy engine for Synapse SQL with Azure AD authentication.
    
    Args:
        database: The database name
        server: The SQL endpoint (defaults to SYNAPSE_SQL_ENDPOINT)
    
    Returns:
        SQLAlchemy Engine object
    """
    server = server or SYNAPSE_SQL_ENDPOINT
    
    # Build connection URL
    connection_url = URL.create(
        "mssql+pyodbc",
        query={
            "driver": "ODBC Driver 18 for SQL Server",
            "Encrypt": "yes",
            "TrustServerCertificate": "no",
        }
    )
    
    # Create engine with token-based authentication
    def get_conn():
        return get_synapse_connection(database, server)
    
    engine = create_engine(
        f"mssql+pyodbc://@{server}/{database}?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=yes",
        creator=get_conn
    )
    
    return engine

# Create engine for serverless pool
engine = create_synapse_engine("master")

# Test the connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT @@VERSION AS version"))
    version = result.fetchone()
    print(f"✓ SQLAlchemy connection successful")
    print(f"  SQL Server Version: {version[0][:50]}...")

## 8. Synapse Management Operations

Use the Azure Management SDK to manage Synapse resources programmatically.

In [None]:
from azure.mgmt.synapse import SynapseManagementClient
from azure.identity import DefaultAzureCredential, AzureAuthorityHosts

# Azure Government management endpoint
AZURE_GOV_MGMT_URL = "https://management.usgovcloudapi.net"

# Create a credential specifically for Azure Government management API
# Need to get token with correct audience/scope
mgmt_credential = DefaultAzureCredential(
    authority=AzureAuthorityHosts.AZURE_GOVERNMENT,
    exclude_shared_token_cache_credential=True
)

# Create management client for Azure Government
mgmt_client = SynapseManagementClient(
    credential=mgmt_credential,
    subscription_id=SUBSCRIPTION_ID,
    base_url=AZURE_GOV_MGMT_URL,
    credential_scopes=["https://management.usgovcloudapi.net/.default"]
)

# List SQL pools in the workspace
print(f"SQL Pools in workspace '{SYNAPSE_WORKSPACE_NAME}':")
print("-" * 50)

# List Dedicated SQL Pools
for pool in mgmt_client.sql_pools.list_by_workspace(RESOURCE_GROUP, SYNAPSE_WORKSPACE_NAME):
    print(f"  • {pool.name}")
    print(f"    SKU: {pool.sku.name if pool.sku else 'N/A'}")
    print(f"    Status: {pool.status}")
    print()

# Get workspace details
workspace = mgmt_client.workspaces.get(RESOURCE_GROUP, SYNAPSE_WORKSPACE_NAME)
print(f"Workspace Details:")
print(f"  Name: {workspace.name}")
print(f"  Location: {workspace.location}")
print(f"  SQL Admin: {workspace.sql_administrator_login}")
print(f"  Dev Endpoint: {workspace.connectivity_endpoints.get('dev', 'N/A')}")

## 9. Helper Functions

Utility functions for common database operations.

In [None]:
def list_all_databases(conn: pyodbc.Connection) -> pd.DataFrame:
    """List all databases in the Synapse workspace."""
    query = """
    SELECT 
        name,
        database_id,
        create_date,
        collation_name
    FROM sys.databases
    ORDER BY name
    """
    return pd.read_sql(query, conn)


def list_tables(conn: pyodbc.Connection, schema: str = None) -> pd.DataFrame:
    """List all tables, optionally filtered by schema."""
    query = """
    SELECT 
        s.name AS schema_name,
        t.name AS table_name,
        t.type_desc,
        t.create_date,
        t.modify_date
    FROM sys.tables t
    INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
    """
    if schema:
        query += f" WHERE s.name = '{schema}'"
    query += " ORDER BY s.name, t.name"
    return pd.read_sql(query, conn)


def list_views(conn: pyodbc.Connection, schema: str = None) -> pd.DataFrame:
    """List all views, optionally filtered by schema."""
    query = """
    SELECT 
        s.name AS schema_name,
        v.name AS view_name,
        v.create_date,
        v.modify_date
    FROM sys.views v
    INNER JOIN sys.schemas s ON v.schema_id = s.schema_id
    """
    if schema:
        query += f" WHERE s.name = '{schema}'"
    query += " ORDER BY s.name, v.name"
    return pd.read_sql(query, conn)


def get_table_columns(conn: pyodbc.Connection, table_name: str, schema: str = "dbo") -> pd.DataFrame:
    """Get column information for a specific table."""
    query = f"""
    SELECT 
        c.name AS column_name,
        t.name AS data_type,
        c.max_length,
        c.precision,
        c.scale,
        c.is_nullable,
        c.is_identity
    FROM sys.columns c
    INNER JOIN sys.types t ON c.user_type_id = t.user_type_id
    INNER JOIN sys.tables tbl ON c.object_id = tbl.object_id
    INNER JOIN sys.schemas s ON tbl.schema_id = s.schema_id
    WHERE tbl.name = '{table_name}' AND s.name = '{schema}'
    ORDER BY c.column_id
    """
    return pd.read_sql(query, conn)


def run_query(conn: pyodbc.Connection, query: str) -> pd.DataFrame:
    """Execute a SQL query and return results as DataFrame."""
    return pd.read_sql(query, conn)


print("✓ Helper functions loaded")

## 10. Cleanup

Close database connections when finished.

In [None]:
# Close all connections
connections_to_close = [
    ('serverless_conn', 'Serverless SQL Pool'),
    ('dedicated_conn', 'Dedicated SQL Pool'),
    ('lake_conn', 'Lake Database'),
]

for conn_var, conn_name in connections_to_close:
    if conn_var in dir() and locals().get(conn_var):
        try:
            locals()[conn_var].close()
            print(f"✓ Closed {conn_name} connection")
        except Exception as e:
            print(f"⚠ Error closing {conn_name}: {e}")

print("\n✓ Cleanup complete")