# Logic App Trigger with DB Helper

This notebook demonstrates how to refactor database connection code to use the `db_helper` module for retrieving bearer tokens and triggering Logic App flows.

**Key Improvements:**
- ‚úÖ Uses existing `db_helper` infrastructure 
- ‚úÖ Leverages `config.yaml` for database configuration
- ‚úÖ Eliminates hardcoded connection strings
- ‚úÖ Better error handling and logging
- ‚úÖ More maintainable and reusable code

**Original vs Refactored:**
- **Before**: Manual `pyodbc` connection with hardcoded credentials
- **After**: Clean `db_helper.run_query()` with configuration management

In [None]:
# Import required libraries and db_helper module
import sys
import requests
from pathlib import Path
import pandas as pd
from datetime import datetime

# Add utils directory to path to import db_helper
repo_root = Path().resolve()
while not (repo_root / "utils" / "db_helper.py").exists() and repo_root.parent != repo_root:
    repo_root = repo_root.parent

sys.path.insert(0, str(repo_root / "utils"))

# Import our custom db_helper
import db_helper

print("‚úÖ Successfully imported db_helper and required libraries")
print(f"üìÅ Repository root: {repo_root}")

‚úÖ Successfully imported db_helper and required libraries
üìÅ Repository root: C:\Users\AUKALATC01\GitHub\data-orchestration\data-orchestration


In [None]:
# Load and verify database configuration
try:
    # Load database configuration from config.yaml
    db_config = db_helper.get_database_config()
    
    print("üìã Available database configurations:")
    for db_key in db_config.keys():
        db_info = db_config[db_key]
        print(f"  ‚Ä¢ {db_key.upper()}: {db_info.get('host', 'N/A')} -> {db_info.get('database', 'N/A')}")
    
    # Test connection to orders database (assuming this is where bearer tokens are stored)
    if 'orders' in db_config:
        test_query = "SELECT 1 as connection_test"
        test_result = db_helper.run_query(test_query, 'orders')
        print(f"\n‚úÖ Database connection test successful!")
        print(f"üìä Connected to: {db_config['orders']['host']}")
    else:
        print("\n‚ö†Ô∏è  'orders' database configuration not found")
        print("Available databases:", list(db_config.keys()))
        
except Exception as e:
    print(f"‚ùå Database configuration error: {e}")
    print("üí° Make sure config.yaml exists with database configurations")

In [None]:
# Retrieve bearer token using db_helper (replaces manual pyodbc connection)
try:
    print("üîç Retrieving bearer token from database...")
    
    # Use db_helper to run the token query - much cleaner than manual connection!
    token_query = """
    SELECT TOP 1 
        bearerToken,
        expiresOn,
        dateRetrieved
    FROM log.BearerTokens
    WHERE expiresOn > GETDATE()   -- token not yet expired
    ORDER BY dateRetrieved DESC;  -- most recent first
    """
    
    # This replaces the entire manual pyodbc connection block!
    token_df = db_helper.run_query(token_query, 'orders')
    
    if len(token_df) == 0:
        raise Exception("No valid (unexpired) bearer token found in database.")
    
    # Extract token information
    bearer_token = token_df.iloc[0]['bearerToken']
    expires_on = token_df.iloc[0]['expiresOn']
    date_retrieved = token_df.iloc[0]['dateRetrieved']
    
    print(f"‚úÖ Successfully retrieved bearer token!")
    print(f"üìÖ Token expires: {expires_on}")
    print(f"üïí Retrieved: {date_retrieved}")
    print(f"üîë Token preview: {bearer_token[:50]}...")
    
except Exception as e:
    print(f"‚ùå Failed to retrieve bearer token: {e}")
    bearer_token = None

In [None]:
# Configure Logic App request parameters
if bearer_token:
    print("üîß Configuring Logic App request...")
    
    # Logic App endpoint URL
    logic_app_url = (
        "https://prod-27.australiasoutheast.logic.azure.com:443/"
        "workflows/f6b302ba68c040619502cbf79e89d853/triggers/"
        "manual/paths/invoke?api-version=2016-06-01"
    )
    
    # Build authorization headers
    headers = {
        "Authorization": f"Bearer {bearer_token}",
        "Content-Type": "application/json",
        "User-Agent": "DataOrchestration-Notebook/1.0"
    }
    
    # Optional: Add request body for Logic App (if needed)
    request_body = {
        "trigger_source": "jupyter_notebook",
        "timestamp": datetime.now().isoformat(),
        "description": "Orders to Blob trigger via db_helper"
    }
    
    print(f"üéØ Target URL: {logic_app_url}")
    print(f"üîë Authorization: Bearer {bearer_token[:20]}...")
    print(f"üì¶ Request body configured")
    
else:
    print("‚ùå Cannot configure Logic App request - no bearer token available")

In [None]:
# Execute Logic App request with bearer token
if bearer_token:
    try:
        print("üöÄ Triggering Logic App...")
        
        # Make the POST request to Logic App
        response = requests.post(
            logic_app_url, 
            headers=headers,
            json=request_body,  # Send JSON body
            timeout=30  # 30 second timeout
        )
        
        # Store response details
        status_code = response.status_code
        response_text = response.text
        response_headers = dict(response.headers)
        
        print(f"üì§ Request sent to Logic App")
        print(f"‚è±Ô∏è  Response time: {response.elapsed.total_seconds():.2f} seconds")
        print(f"üìä Status Code: {status_code}")
        
    except requests.exceptions.Timeout:
        print("‚è∞ Request timed out after 30 seconds")
        status_code = None
        response_text = "Request timeout"
        
    except requests.exceptions.RequestException as e:
        print(f"üåê Network error: {e}")
        status_code = None
        response_text = str(e)
        
    except Exception as e:
        print(f"‚ùå Unexpected error: {e}")
        status_code = None
        response_text = str(e)
        
else:
    print("‚ùå Cannot execute Logic App request - no bearer token available")

In [None]:
# Handle response and error checking
if bearer_token and 'status_code' in locals():
    print("\n" + "="*60)
    print("üìã LOGIC APP TRIGGER RESULTS")
    print("="*60)
    
    if status_code and status_code in (200, 202):
        print("‚úÖ SUCCESS! Logic App triggered successfully")
        print(f"üìä Status Code: {status_code}")
        print(f"üéØ Logic App Response: {response_text[:200]}{'...' if len(response_text) > 200 else ''}")
        
        # Log success to database (optional)
        try:
            success_log = f"""
            INSERT INTO log.LogicAppTriggers (
                trigger_date, 
                status_code, 
                response_text, 
                bearer_token_used,
                trigger_source
            ) VALUES (
                GETDATE(), 
                {status_code}, 
                '{response_text[:500]}', 
                '{bearer_token[:50]}',
                'jupyter_notebook_db_helper'
            )
            """
            db_helper.execute(success_log, 'orders')
            print("üìù Success logged to database")
        except:
            print("‚ö†Ô∏è  Could not log to database (table may not exist)")
            
    else:
        print("‚ùå FAILED! Logic App trigger unsuccessful")
        if status_code:
            print(f"üìä Status Code: {status_code}")
        print(f"üîç Error Details: {response_text}")
        print("\nüí° Troubleshooting tips:")
        print("  ‚Ä¢ Check bearer token validity")
        print("  ‚Ä¢ Verify Logic App URL")
        print("  ‚Ä¢ Check network connectivity")
        print("  ‚Ä¢ Review Azure Logic App logs")

else:
    print("‚ùå Logic App trigger not executed - check previous steps for errors")

print("\n" + "="*60)
print("üéØ SUMMARY: Refactored notebook using db_helper")
print("‚úÖ No hardcoded database credentials")
print("‚úÖ Leveraged existing configuration system") 
print("‚úÖ Cleaner, more maintainable code")
print("‚úÖ Better error handling and logging")
print("="*60)

# Power Automate Flow HTTP Trigger Authentication

The bearer token approach failed because **Power Automate flows** (not Logic Apps) use different authentication methods for manual HTTP triggers.

## Power Automate Authentication Options:

### Option 1: **No Authentication Required** (Most Common)
- Power Automate flows can be configured to accept "Anyone" triggers
- No authentication headers needed
- Simple POST request to the trigger URL

### Option 2: **Basic Authentication with Flow Owner Credentials**
- Uses the flow owner's username/password
- Encoded in Basic Auth header

### Option 3: **API Key in Headers**
- Custom API key defined in the flow
- Passed as a custom header

## Your Flow URL Analysis:
Your URL: `https://prod-27.australiasoutheast.logic.azure.com:443/workflows/f6b302ba68c040619502cbf79e89d853/triggers/manual/paths/invoke?api-version=2016-06-01`

This appears to be a **Power Automate flow** trigger URL (even though it says logic.azure.com - that's the underlying infrastructure).

## Implementation Below:
Let's test different authentication approaches for Power Automate flows.

In [None]:
# Power Automate Flow Trigger Implementation using db_helper

def trigger_power_automate_flow_no_auth(flow_url, payload=None):
    """
    Trigger Power Automate flow with no authentication (most common)
    
    Args:
        flow_url: Power Automate flow trigger URL
        payload: Optional JSON payload for the request
    """
    print("[CONFIG] Triggering Power Automate flow with no authentication...")
    
    headers = {
        'Content-Type': 'application/json',
        'User-Agent': 'DataOrchestration-Notebook/1.0'
    }
    
    if payload is None:
        payload = {
            "trigger_source": "jupyter_notebook",
            "timestamp": datetime.now().isoformat(),
            "description": "Power Automate flow trigger from data orchestration"
        }
    
    try:
        print("[REQUEST] Sending POST request to Power Automate flow...")
        print(f"[URL] {flow_url}")
        
        response = requests.post(flow_url, json=payload, headers=headers, timeout=30)
        
        print(f"[RESPONSE] Status Code: {response.status_code}")
        print(f"[RESPONSE] Response time: {response.elapsed.total_seconds():.2f} seconds")
        print(f"[RESPONSE] Response: {response.text[:200]}{'...' if len(response.text) > 200 else ''}")
        
        # Power Automate flows typically return 202 (Accepted) for successful triggers
        success = response.status_code in [200, 202]
        
        if success:
            print("[SUCCESS] Power Automate flow triggered successfully!")
        else:
            print(f"[WARNING] Unexpected status code: {response.status_code}")
            
        return response
        
    except Exception as e:
        print(f"[ERROR] Failed to trigger Power Automate flow: {e}")
        return None

def trigger_power_automate_flow_basic_auth(flow_url, username, password, payload=None):
    """
    Trigger Power Automate flow with basic authentication
    
    Args:
        flow_url: Power Automate flow trigger URL
        username: Flow owner's username (email)
        password: Flow owner's password
        payload: Optional JSON payload for the request
    """
    print("[CONFIG] Triggering Power Automate flow with basic authentication...")
    
    import base64
    credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
    
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Basic {credentials}',
        'User-Agent': 'DataOrchestration-Notebook/1.0'
    }
    
    if payload is None:
        payload = {
            "trigger_source": "jupyter_notebook_basic_auth",
            "timestamp": datetime.now().isoformat(),
            "description": "Power Automate flow trigger with basic auth"
        }
    
    try:
        print("[REQUEST] Sending authenticated POST request to Power Automate flow...")
        response = requests.post(flow_url, json=payload, headers=headers, timeout=30)
        
        print(f"[RESPONSE] Status Code: {response.status_code}")
        print(f"[RESPONSE] Response time: {response.elapsed.total_seconds():.2f} seconds")
        print(f"[RESPONSE] Response: {response.text[:200]}{'...' if len(response.text) > 200 else ''}")
        
        success = response.status_code in [200, 202]
        
        if success:
            print("[SUCCESS] Power Automate flow triggered successfully with basic auth!")
        else:
            print(f"[WARNING] Basic auth failed with status: {response.status_code}")
            
        return response
        
    except Exception as e:
        print(f"[ERROR] Failed to trigger Power Automate flow with basic auth: {e}")
        return None

def trigger_power_automate_flow_api_key(flow_url, api_key, key_header_name="X-API-Key", payload=None):
    """
    Trigger Power Automate flow with API key authentication
    
    Args:
        flow_url: Power Automate flow trigger URL
        api_key: API key for authentication
        key_header_name: Header name for the API key (default: X-API-Key)
        payload: Optional JSON payload for the request
    """
    print(f"[CONFIG] Triggering Power Automate flow with API key in {key_header_name} header...")
    
    headers = {
        'Content-Type': 'application/json',
        key_header_name: api_key,
        'User-Agent': 'DataOrchestration-Notebook/1.0'
    }
    
    if payload is None:
        payload = {
            "trigger_source": "jupyter_notebook_api_key",
            "timestamp": datetime.now().isoformat(),
            "description": "Power Automate flow trigger with API key"
        }
    
    try:
        print("[REQUEST] Sending API key authenticated POST request...")
        response = requests.post(flow_url, json=payload, headers=headers, timeout=30)
        
        print(f"[RESPONSE] Status Code: {response.status_code}")
        print(f"[RESPONSE] Response time: {response.elapsed.total_seconds():.2f} seconds")
        print(f"[RESPONSE] Response: {response.text[:200]}{'...' if len(response.text) > 200 else ''}")
        
        success = response.status_code in [200, 202]
        
        if success:
            print("[SUCCESS] Power Automate flow triggered successfully with API key!")
        else:
            print(f"[WARNING] API key auth failed with status: {response.status_code}")
            
        return response
        
    except Exception as e:
        print(f"[ERROR] Failed to trigger Power Automate flow with API key: {e}")
        return None

print("[READY] Power Automate flow trigger functions defined!")
print("Next: Test different authentication methods with your flow")

In [None]:
# Demo: Test Power Automate Flow with SAS Authentication (No Auth Required)
print("="*60)
print("POWER AUTOMATE FLOW TRIGGER DEMONSTRATION")
print("="*60)

# Your Power Automate flow URL with SAS signature (no additional auth required)
power_automate_url = (
    "https://prod-27.australiasoutheast.logic.azure.com:443/"
    "workflows/f6b302ba68c040619502cbf79e89d853/triggers/"
    "manual/paths/invoke?api-version=2016-06-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=HIXJekmqnQhsarXH5w8lbDDtzLt1qR7qOWy0HudZ4IM"
)

print(f"[URL] Power Automate Flow URL: {power_automate_url[:100]}...")
print("[AUTH] SAS signature detected in URL - no additional authentication required")

# Test payload for the flow
test_payload = {
    "trigger_source": "notebook_test",
    "timestamp": datetime.now().isoformat(),
    "test_data": "Testing Power Automate flow trigger from Jupyter notebook",
    "environment": "development",
    "user": "data_orchestration_team",
    "flow_type": "dataflow_refresh"
}

print(f"[PAYLOAD] Test payload prepared: {len(str(test_payload))} characters")

# Test the Power Automate flow with SAS authentication (no auth headers needed)
print("\n" + "="*40)
print("TESTING POWER AUTOMATE FLOW")
print("="*40)
print("[TEST] Testing Power Automate flow with SAS signature...")
print("[INFO] URL contains 'sig=' parameter - no additional authentication headers needed")

response = trigger_power_automate_flow_no_auth(power_automate_url, test_payload)

# Analyze the response
print("\n" + "="*60)
print("POWER AUTOMATE FLOW TEST RESULTS")
print("="*60)

if response:
    status_code = response.status_code
    response_text = response.text
    
    if status_code in [200, 202]:
        print("[RESULT] ‚úÖ SUCCESS! Power Automate flow triggered successfully")
        print(f"[STATUS] Status Code: {status_code}")
        print(f"[RESPONSE] Response: {response_text[:200]}{'...' if len(response_text) > 200 else ''}")
        
        # Try to log success to database
        try:
            success_log = """
            INSERT INTO log.PowerAutomateFlowTriggers (
                trigger_date, 
                status_code, 
                response_text, 
                flow_url,
                trigger_source,
                success
            ) VALUES (
                GETDATE(), 
                ?, 
                ?, 
                ?,
                'jupyter_notebook_sas',
                1
            )
            """
            db_helper.execute(success_log, 'orders', params=(status_code, response_text[:500], power_automate_url[:200]))
            print("[DATABASE] Success logged to database")
        except Exception as db_error:
            print(f"[WARNING] Could not log to database (table may not exist): {db_error}")
        
        # Success - store the working URL pattern for scripts
        print("\n[SUCCESS] Power Automate flow is working!")
        print("[RECOMMENDATION] Use this URL pattern in your production scripts")
        
    elif status_code == 401:
        print("[RESULT] ‚ùå 401 Unauthorized - SAS signature may be expired or invalid")
        print("[ACTION] Get a fresh URL from Power Automate portal")
        
    elif status_code == 404:
        print("[RESULT] ‚ùå 404 Not Found - flow may not exist or be published")
        print("[ACTION] Check if the flow is published and active")
        
    else:
        print(f"[RESULT] ‚ö†Ô∏è  Unexpected status code: {status_code}")
        print(f"[RESPONSE] {response_text}")
        
else:
    print("[RESULT] ‚ùå No response received - network or connection error")

# Summary and next steps
print("\n" + "="*60)
print("IMPLEMENTATION SUMMARY")
print("="*60)

if response and response.status_code in [200, 202]:
    print("‚úÖ SUCCESS! Your Power Automate flow works with SAS authentication")
    print("? Implementation notes:")
    print("   ‚Ä¢ No authentication headers required")
    print("   ‚Ä¢ URL contains embedded SAS signature")
    print("   ‚Ä¢ Simple POST request with JSON payload")
    print("   ‚Ä¢ Returns 202 (Accepted) for successful triggers")
    
    print("\nüìã Next Steps:")
    print("1. Update your production script to use this exact URL")
    print("2. Store the URL securely (contains access signature)")
    print("3. Monitor for SAS signature expiration")
    print("4. Test with your actual workflow payload")
    
    # Show code template for production use
    print("\nüîß Production Code Template:")
    print("""
def trigger_dataflow_refresh():
    url = "your_full_power_automate_url_with_sas"
    payload = {"trigger_source": "production_script", "timestamp": datetime.now().isoformat()}
    response = requests.post(url, json=payload, headers={'Content-Type': 'application/json'})
    return response.status_code in [200, 202]
""")
    
else:
    print("‚ùå Flow trigger failed")
    print("üí° Troubleshooting steps:")
    print("   ‚Ä¢ Verify the flow is published and active in Power Automate portal")
    print("   ‚Ä¢ Check if the SAS signature has expired")
    print("   ‚Ä¢ Ensure the flow accepts HTTP requests from external sources")
    print("   ‚Ä¢ Test the URL directly in a tool like Postman")

print("="*60)

KeyboardInterrupt: Interrupted by user