# Mid-Level API: Composable Building Blocks

**Goal:** Build custom transfer workflows using composable components.

This is for users who need more control but don't want to handle low-level details.

## Why This Matters

- **Flexible**: Mix and match components for custom workflows
- **Reusable**: Build patterns once, use them everywhere  
- **Transparent**: See exactly what each step does
- **Composable**: Chain operations like LEGO blocks

## Real World Usage

Data engineers who need custom pipelines but want to stay productive. Build once, reuse everywhere.


In [11]:
from snowpark_db_api import TransferBuilder, ConnectionManager
from snowpark_db_api.transforms import Pipeline, SchemaTransform, QueryTransform, show_pipeline_steps

## 1. TransferBuilder Pattern

Build transfers step by step using fluent interface. Each step is explicit and reusable.


In [26]:
# Build a custom transfer workflow
result = (TransferBuilder()
         .from_source("(SELECT TOP 50 ID, Column0, Column1 FROM dbo.RandomDataWith100Columns) AS builder_test")
         .to_destination("BUILDER_TEST_TABLE") 
         .with_schema_mapping()  # Custom schema mapping
         .with_query_optimization()  # Query optimization
         .show_pipeline_steps(True)  # Show what's happening
         .execute())

print(f"Basic transfer result: {result}")

Pipeline Analysis
Pipeline has 2 transforms:
  1. SchemaTransform
  2. QueryTransform

Sample transformation with input: (SELECT TOP 1000 * FROM dbo.Orders WHERE orderdate >= '2023-01-01') AS recent_orders
  Step 1 (SchemaTransform): Schema mapping configured (3 columns)
  Step 2 (QueryTransform): Query processed → BUILDER_TEST_TABLE

Pipeline Analysis Complete
   All transforms are properly configured and ready for execution
🏗️ Executing custom pipeline: (SELECT TOP 1000 * FROM dbo.Orders WHERE orderdate >= '2023-01-01') AS recent_orders → BUILDER_TEST_TABLE
📋 Pipeline steps: ['SchemaTransform', 'QueryTransform']
2025-07-03 16:11:13 - snowpark_db_api.core - INFO - Setting up connections
2025-07-03 16:11:14 - snowpark_db_api.core - INFO - All connections established
2025-07-03 16:11:14 - snowpark_db_api.core - INFO - Starting transfer using custom query -> BUILDER_TEST_TABLE
2025-07-03 16:11:15 - snowpark_db_api.connections - INFO - Successfully connected to SQL Server: alingtestserver

In [13]:
# Custom query with pipeline optimization
result = (TransferBuilder()
    .from_source("(SELECT TOP 1000 * FROM dbo.RandomDataWith100Columns WHERE Column1 IS NOT NULL) AS sample_data")
    .to_destination("SAMPLE_DATA")
    .with_schema_mapping({'VARCHAR': 'STRING', 'DATETIME': 'TIMESTAMP'})
    .with_query_optimization()
    .with_environment('production')
    .show_pipeline_steps(True)
    .execute())

print(f"Custom pipeline result: {result}")

Pipeline Analysis
Pipeline has 3 transforms:
  1. SchemaTransform
  2. QueryTransform
  3. ConnectionTransform

Sample transformation with input: (SELECT TOP 1000 * FROM dbo.RandomDataWith100Columns WHERE Column1 IS NOT NULL) AS sample_data
  Step 1 (SchemaTransform): Schema mapping configured (3 columns)
  Step 2 (QueryTransform): Query processed → SAMPLE_DATA
  Step 3 (ConnectionTransform): Connection optimized → {'fetch_size': 1000, 'max_workers': 4, 'timeout': 300}

Pipeline Analysis Complete
   All transforms are properly configured and ready for execution
🏗️ Executing custom pipeline: (SELECT TOP 1000 * FROM dbo.RandomDataWith100Columns WHERE Column1 IS NOT NULL) AS sample_data → SAMPLE_DATA
📋 Pipeline steps: ['SchemaTransform', 'QueryTransform', 'ConnectionTransform']
2025-07-03 16:02:46 - snowpark_db_api.core - INFO - Setting up connections
2025-07-03 16:02:48 - snowpark_db_api.core - INFO - All connections established
2025-07-03 16:02:48 - snowpark_db_api.core - INFO - Startin

In [14]:
# Complex workflow with multiple configurations
result = (TransferBuilder()
         .from_source("(SELECT TOP 15 Id, FullName FROM dbo.UserProfile WHERE Id IS NOT NULL) AS advanced_test")
         .to_destination("ADVANCED_WORKFLOW_TABLE")
         .with_schema_mapping({'Id': 'USER_ID', 'FullName': 'FULL_NAME'})  # Custom mappings
         .with_query_optimization({'complexity': 'simple'})  # Optimization hints
         .with_environment('production')  # Production settings
         .show_pipeline_steps(True)  # Show transparency
         .execute())

Pipeline Analysis
Pipeline has 3 transforms:
  1. SchemaTransform
  2. QueryTransform
  3. ConnectionTransform

Sample transformation with input: (SELECT TOP 15 Id, FullName FROM dbo.UserProfile WHERE Id IS NOT NULL) AS advanced_test
  Step 1 (SchemaTransform): Schema mapping configured (3 columns)
  Step 2 (QueryTransform): Query processed → ADVANCED_WORKFLOW_TABLE
  Step 3 (ConnectionTransform): Connection optimized → {'fetch_size': 1000, 'max_workers': 4, 'timeout': 300}

Pipeline Analysis Complete
   All transforms are properly configured and ready for execution
🏗️ Executing custom pipeline: (SELECT TOP 15 Id, FullName FROM dbo.UserProfile WHERE Id IS NOT NULL) AS advanced_test → ADVANCED_WORKFLOW_TABLE
📋 Pipeline steps: ['SchemaTransform', 'QueryTransform', 'ConnectionTransform']
2025-07-03 16:03:03 - snowpark_db_api.core - INFO - Setting up connections
2025-07-03 16:03:04 - snowpark_db_api.core - INFO - All connections established
2025-07-03 16:03:04 - snowpark_db_api.core - INFO

## 2. Connection Management

Explicit control over database connections. Perfect for batch processing or long-running operations.


In [6]:
# Test complete lifecycle
manager = ConnectionManager()

# Step 1: Connect
print("Step 1: Establishing connections...")
connect_success = manager.connect()
print(f"Connection established: {connect_success}")

if not connect_success:
    print("❌ Failed to establish connections")

# Step 2: Test
print("Step 2: Testing connections...")
test_results = manager.test_connections()
print(f"Test results: {test_results}")

# Step 3: Execute multiple transfers
print("Step 3: Executing multiple transfers...")
transfers = [
    "(SELECT TOP 5 ID, Column0 FROM dbo.RandomDataWith100Columns) AS lifecycle_test1",
    "(SELECT TOP 5 Id, FullName FROM dbo.UserProfile WHERE Id IS NOT NULL) AS lifecycle_test2"
]

results = []
for transfer_query in transfers:
    result = manager.execute_transfer(transfer_query)
    results.append(result)
    print(f"Transfer result: {result}")

# Step 4: Clean up
print("Step 4: Cleaning up connections...")
manager.close()

overall_success = all(results)
print(f"Lifecycle test overall success: {overall_success}")

Step 1: Establishing connections...
🔌 Establishing database connections
2025-07-03 01:50:50 - snowpark_db_api.core - INFO - Setting up connections
2025-07-03 01:50:51 - snowpark_db_api.core - INFO - All connections established
Connection established: True
Step 2: Testing connections...
2025-07-03 01:50:52 - snowpark_db_api.connections - INFO - Successfully connected to SQL Server: alingtestserver.database.windows.net
✅ Source database connection OK
✅ Snowflake connection OK
Test results: {'source_db': True, 'snowflake': True}
Step 3: Executing multiple transfers...
2025-07-03 01:50:52 - snowpark_db_api.core - INFO - Starting transfer using custom query -> RANDOMDATAWITH100COLUMNS
2025-07-03 01:50:53 - snowpark_db_api.connections - INFO - Successfully connected to SQL Server: alingtestserver.database.windows.net
2025-07-03 01:50:54 - snowpark_db_api.connections - INFO - Successfully connected to SQL Server: alingtestserver.database.windows.net
2025-07-03 01:50:55 - snowpark_db_api.conne

## 4. Reusable Patterns

Build patterns once, use them everywhere. This is where the mid-level API really shines.


In [13]:
# Define reusable transfer patterns
def create_sql_server_to_snowflake_pattern():
    """Standard pattern for SQL Server to Snowflake transfers"""
    return (TransferBuilder()
            .with_schema_mapping({
                'VARCHAR': 'STRING',
                'NVARCHAR': 'STRING', 
                'DATETIME': 'TIMESTAMP',
                'DATETIME2': 'TIMESTAMP',
                'BIT': 'BOOLEAN'
            })
            .with_environment('production')
            .show_pipeline_steps(True))

def create_batch_transfer_pattern(table_list):
    """Reusable pattern for batch transfers"""
    conn = ConnectionManager()
    try:
        conn.connect()
        results = []
        for table in table_list:
            result = conn.execute_transfer(table)
            results.append((table, result))
        return results
    finally:
        conn.close()  # Ensure cleanup happens

# Use the patterns
# Pattern 1: Standard transfer with custom schema mapping
standard_pattern = create_sql_server_to_snowflake_pattern()
result = (standard_pattern
    .from_source("dbo.Orders")
    .to_destination("ORDERS_STANDARDIZED")
    .execute())

print(f"Standard pattern result: {result}")

# Pattern 2: Batch processing
tables = [
    "dbo.Orders",
    "dbo.UserProfile",
    "(SELECT TOP 100 * FROM dbo.RandomDataWith100Columns) AS sample"
]

batch_results = create_batch_transfer_pattern(tables)
print(f"Batch results: {batch_results}")

print("♻️ Patterns can be reused across projects and teams!")


🔍 Pipeline Analysis
📋 Pipeline has 2 transforms:
  1. SchemaTransform
  2. ConnectionTransform

🧪 Sample transformation with input: dbo.Orders
  Step 1 (SchemaTransform): ✅ Schema mapping configured (3 columns)
  Step 2 (ConnectionTransform): ✅ Connection optimized → {'fetch_size': 1000, 'max_workers': 4, 'timeout': 300}

✅ Pipeline Analysis Complete
   All transforms are properly configured and ready for execution
🏗️ Executing custom pipeline: dbo.Orders → ORDERS_STANDARDIZED
📋 Pipeline steps: ['SchemaTransform', 'ConnectionTransform']
2025-07-03 01:55:16 - snowpark_db_api.core - INFO - Setting up connections
2025-07-03 01:55:17 - snowpark_db_api.core - INFO - All connections established
2025-07-03 01:55:17 - snowpark_db_api.core - INFO - Starting transfer: dbo.Orders -> ORDERS_STANDARDIZED
2025-07-03 01:55:18 - snowpark_db_api.connections - INFO - Successfully connected to SQL Server: alingtestserver.database.windows.net
2025-07-03 01:55:19 - snowpark_db_api.connections - INFO - Succ

## Summary: When to Use Mid-Level API

**Use this API when:**
- ✅ You need custom workflows but want to stay productive
- ✅ You want to build reusable patterns for your team
- ✅ You need explicit control over connections and batching
- ✅ You want to see what's happening in your pipelines
- ✅ You're building data engineering tools

**Don't use this API when:**
- ❌ You just want to copy data quickly (use High-Level API)
- ❌ You need to access raw Snowpark operations
- ❌ You want to customize every aspect of the transfer
- ❌ You're building low-level database tools

**What you learned:**
- `TransferBuilder` - Fluent interface for building transfers
- `ConnectionManager` - Explicit connection control and batching
- `Pipeline` - Composable transformation chains
- `show_pipeline_steps()` - Transparency into what's happening
- Reusable patterns for team productivity

**Next:** Check out the Low-Level API for complete control over transfers.
