# Publish and Distribute Paginated Reports

This notebook executes a paginated report multiple times with different parameter values and saves the output to OneLake.

**Features:**
- 4 flexible parameter sources: Semantic Model, Lakehouse, JSON, Warehouse
- Automatic token refresh for long-running batches
- Configurable performance tuning parameters
- OneLake archival with date-based folder hierarchy
- Retry logic with exponential backoff
- Comprehensive logging and error handling
- Input validation and SQL injection protection
- Pipeline integration ready

**Pipeline Integration:**
The notebook exits with JSON containing the file list:
```json
{
  "files": ["Files/reports/archive/2025/01/30/Report_Customer_A.pdf", ...],
  "status": "success",
  "success_count": 10,
  "fail_count": 0
}
```

Pipeline ForEach items reference: `@json(activity('NotebookActivity').output.status.Output.result.exitValue).files`

**Author:** Generated by Claude Code  
**Version:** 1.0

In [None]:
# CELL 1: Parameter Definitions
# These parameters can be overridden by pipeline

# Report configuration
workspace_id = ""                    # Fabric workspace GUID
report_id = ""                       # Paginated report GUID
output_format = "XLSX"                # PDF, XLSX, DOCX, PPTX, PNG
static_params = "{}"                 # JSON: {"start_date": "2024-01-01", "end_date": "2024-12-31"}
special_param_name = "Producer"       # Parameter name to loop through

# ============================================================
# SOURCE CONFIGURATION (supports 4 sources)
# ============================================================
special_values_source = "semantic_model"  # "semantic_model" | "lakehouse" | "json" | "warehouse"

# -------------------- OPTION 1: SEMANTIC MODEL (RECOMMENDED) --------------------
semantic_model_workspace_id = ""       # Workspace containing the semantic model
semantic_model_dataset_id = ""         # Semantic model (dataset) GUID
semantic_model_dax_query = ""          # DAX query to get parameter values

# -------------------- OPTION 2: LAKEHOUSE TABLE --------------------
lakehouse_table = "parameter_config"  # Delta table name
lakehouse_category = "ProducerList"   # Category filter
lakehouse_column = "ParameterValue"   # Column containing values
lakehouse_filter = ""                 # Optional: additional WHERE clause

# -------------------- OPTION 3: JSON ARRAY --------------------
special_parameter_values = "[]"       # JSON: ["Producer A", "Producer B", "Producer C"]

# -------------------- OPTION 4: WAREHOUSE --------------------
warehouse_name = ""                   # Warehouse name
warehouse_table = ""                  # Table name (e.g., "dbo.ParameterConfig")
warehouse_column = ""                 # Column name
warehouse_category = ""               # Category filter

# ============================================================
# EXECUTION OPTIONS
# ============================================================
archive_to_onelake = "true"           # Save to OneLake
max_retries = "3"                     # Retry attempts per report
export_timeout_seconds = "600"        # Max seconds to wait for export (10 minutes)
poll_interval_seconds = "5"           # Seconds between status polls
retry_backoff_base = "30"             # Base seconds for exponential backoff (30, 60, 120...)

# ============================================================
# PERFORMANCE TUNING (OPTIONAL)
# ============================================================
download_chunk_size_mb = "1"          # Download chunk size in MB
file_size_warning_mb = "500"          # File size warning threshold in MB
connection_timeout_seconds = "30"     # API connection timeout in seconds
download_timeout_seconds = "120"      # Download timeout in seconds
param_loader_retry_attempts = "3"     # Parameter loading retry attempts
param_loader_retry_delay_seconds = "5"  # Delay between parameter loading retries
token_refresh_interval_minutes = "45"  # Token refresh interval in minutes

# NOTE: Config dictionary is built in Cell 2 AFTER pipeline parameters are injected

In [None]:
# ============================================================================
# CELL 2: COMPLETE OOP IMPLEMENTATION
# ============================================================================

# ============================================================================
# IMPORTS AND PACKAGE INSTALLATION
# ============================================================================

import requests
import json
import time
import re
import struct
import sys
import os
import tempfile
import unicodedata
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Any, Optional, Tuple

def install_package(package: str) -> None:
    """Install package if not already installed"""
    import subprocess
    try:
        __import__(package.replace('-', '_'))
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package, "-q"])

# Install semantic-link for semantic model support
try:
    import sempy.fabric as fabric
except ImportError:
    install_package("semantic-link")
    import sempy.fabric as fabric

# Fabric imports
from notebookutils import mssparkutils

# ============================================================================
# LOGGER SETUP
# ============================================================================

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# ============================================================================
# BUILD CONFIGURATION DICTIONARY
# ============================================================================
# Pipeline parameters are injected between Cell 1 and Cell 2, so we build
# the config dictionary here to capture the injected values

# Parse JSON parameters
static_params_dict = json.loads(static_params)

# Strip whitespace from GUIDs (in case pipeline adds spaces)
workspace_id = workspace_id.strip() if workspace_id else ""
report_id = report_id.strip() if report_id else ""

# Build unified configuration dictionary for executor
config = {
    # Core report configuration
    'workspace_id': workspace_id,
    'report_id': report_id,
    'output_format': output_format.upper(),
    'static_params': static_params_dict,
    'special_param_name': special_param_name,
    'special_values_source': special_values_source,
    
    # Source-specific configurations
    'semantic_model': {
        'workspace_id': semantic_model_workspace_id.strip() if semantic_model_workspace_id else "",
        'dataset_id': semantic_model_dataset_id.strip() if semantic_model_dataset_id else "",
        'dax_query': semantic_model_dax_query
    },
    'lakehouse': {
        'table': lakehouse_table,
        'category': lakehouse_category,
        'column': lakehouse_column,
        'filter_clause': lakehouse_filter
    },
    'json': {
        'json_values': special_parameter_values
    },
    'warehouse': {
        'warehouse_name': warehouse_name,
        'table': warehouse_table,
        'column': warehouse_column,
        'category': warehouse_category
    },
    
    # Execution settings (convert strings to appropriate types)
    'archive_to_onelake': archive_to_onelake.lower() == "true",
    'max_retries': int(max_retries),
    'export_timeout': int(export_timeout_seconds),
    'poll_interval': int(poll_interval_seconds),
    'retry_backoff_base': int(retry_backoff_base),
    'download_chunk_size_mb': int(download_chunk_size_mb),
    'file_size_warning_mb': int(file_size_warning_mb),
    'connection_timeout': int(connection_timeout_seconds),
    'download_timeout': int(download_timeout_seconds),
    'param_loader_max_retries': int(param_loader_retry_attempts),
    'param_loader_retry_delay': int(param_loader_retry_delay_seconds),
    'token_refresh_interval': int(token_refresh_interval_minutes)
}

logger.info("Config dictionary built successfully")
logger.info(f"  workspace_id: {config['workspace_id'][:8]}... (length={len(config['workspace_id'])})")
logger.info(f"  report_id: {config['report_id'][:8]}... (length={len(config['report_id'])})")
logger.info(f"  special_values_source: {config['special_values_source']}")


# ============================================================================
# INPUT VALIDATOR CLASS
# ============================================================================

class InputValidator:
    """Validate and sanitize user inputs to prevent injection attacks"""
    
    @staticmethod
    def is_valid_guid(value: str) -> bool:
        """Validate GUID format"""
        guid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
        return bool(re.match(guid_pattern, value.lower())) if value else False
    
    @staticmethod
    def is_valid_sql_identifier(value: str) -> bool:
        """Validate SQL identifier (table/column name) - alphanumeric, underscore, dot only"""
        pattern = r'^[a-zA-Z_][a-zA-Z0-9_\.]*$'
        return bool(re.match(pattern, value)) and len(value) <= 128
    
    @staticmethod
    def is_valid_source_type(value: str) -> bool:
        """Validate parameter source type"""
        return value in ["semantic_model", "lakehouse", "json", "warehouse"]
    
    @staticmethod
    def is_valid_format(value: str) -> bool:
        """Validate output format"""
        return value.upper() in ["PDF", "XLSX", "DOCX", "PPTX", "PNG"]
    
    @staticmethod
    def sanitize_sql_string(value: str) -> str:
        """Sanitize string for use in SQL - escape single quotes"""
        if value is None:
            return ""
        return str(value).replace("'", "''")


# ============================================================================
# TOKEN MANAGER CLASS
# ============================================================================

class TokenManager:
    """Manage Power BI API tokens with automatic refresh for long-running batches
    
    Power BI tokens typically expire after 1 hour. This class tracks token age
    and refreshes proactively to prevent mid-batch authentication failures.
    """
    
    def __init__(self, mssparkutils, refresh_interval_minutes: int = 45):
        """Initialize token manager"""
        self.mssparkutils = mssparkutils
        self.refresh_interval = timedelta(minutes=refresh_interval_minutes)
        self.powerbi_token = None
        self.token_acquired_at = None
        self.refresh_tokens()  # Acquire initial tokens
    
    def refresh_tokens(self) -> Tuple[str, Dict[str, str]]:
        """Refresh Power BI API tokens"""
        try:
            self.powerbi_token = self.mssparkutils.credentials.getToken("pbi")
            self.token_acquired_at = datetime.now(timezone.utc)
            logger.info("‚úì Power BI API token refreshed successfully")
            return self.powerbi_token, self.get_headers()
        except Exception as e:
            logger.error("‚ùå Failed to refresh Power BI API token")
            raise ValueError(f"Token refresh failed: {str(e)[:200]}")
    
    def get_headers(self) -> Dict[str, str]:
        """Get current API headers with bearer token"""
        return {"Authorization": f"Bearer {self.powerbi_token}", "Content-Type": "application/json"}
    
    def ensure_valid_token(self) -> Dict[str, str]:
        """Ensure token is valid, refresh if needed"""
        if self.token_acquired_at is None:
            return self.refresh_tokens()[1]  # BUG FIX: Added return statement
        else:
            time_since_refresh = datetime.now(timezone.utc) - self.token_acquired_at
            if time_since_refresh >= self.refresh_interval:
                logger.info(f"üîÑ Token is {time_since_refresh.total_seconds()/60:.1f} minutes old, refreshing...")
                self.refresh_tokens()
        return self.get_headers()


# ============================================================================
# PARAMETER LOADER CLASS
# ============================================================================

class ParameterLoader:
    """Load special parameter values from multiple sources with security and retry logic"""
    
    def __init__(self, mssparkutils, spark=None, max_retries: int = 3, retry_delay: int = 5):
        """Initialize parameter loader"""
        self.mssparkutils = mssparkutils
        self.spark = spark
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.validator = InputValidator()
    
    def load(self, source_type: str, **config) -> List[str]:
        """Load parameters with retry logic and deduplication"""
        if not self.validator.is_valid_source_type(source_type):
            raise ValueError(f"Invalid source type: {source_type}. Must be one of: semantic_model, lakehouse, json, warehouse")
        
        last_exception = None
        for attempt in range(self.max_retries):
            try:
                if source_type == "semantic_model":
                    values = self._load_from_semantic_model(**config)
                elif source_type == "lakehouse":
                    values = self._load_from_lakehouse(**config)
                elif source_type == "json":
                    values = self._load_from_json(**config)
                elif source_type == "warehouse":
                    values = self._load_from_warehouse(**config)
                
                # Deduplicate while preserving order
                seen = set()
                unique_values = []
                duplicates = []
                for v in values:
                    if v not in seen:
                        seen.add(v)
                        unique_values.append(v)
                    else:
                        duplicates.append(v)
                
                if duplicates:
                    logger.warning(f"‚ö† Removed {len(duplicates)} duplicate value(s) from parameter list")
                    logger.debug(f"  Duplicates: {duplicates[:10]}")
                
                return unique_values
                
            except Exception as e:
                last_exception = e
                if attempt < self.max_retries - 1:
                    logger.warning(f"‚ö† Parameter loading attempt {attempt + 1} failed: {str(e)[:200]}")
                    logger.info(f"  ‚Üí Retrying in {self.retry_delay} seconds...")
                    time.sleep(self.retry_delay)
                else:
                    logger.error(f"‚ùå All {self.max_retries} parameter loading attempts failed")
                    raise Exception(f"Failed to load parameters from {source_type} after {self.max_retries} attempts: {str(last_exception)[:500]}")
    
    def _load_from_semantic_model(self, workspace_id: str, dataset_id: str, dax_query: str, **kwargs) -> List[str]:
        """Load from Semantic Model (Power BI Dataset) via sempy"""
        if not self.validator.is_valid_guid(workspace_id):
            raise ValueError(f"Invalid workspace GUID format: {workspace_id}")
        if not self.validator.is_valid_guid(dataset_id):
            raise ValueError(f"Invalid dataset GUID format: {dataset_id}")
        if not dax_query or len(dax_query) > 10000:
            raise ValueError("DAX query must be between 1 and 10000 characters")
        
        logger.info(f"üìä Querying Semantic Model...")
        logger.info(f"   Workspace: {workspace_id[:8]}...")
        logger.info(f"   Dataset: {dataset_id[:8]}...")
        
        df = fabric.evaluate_dax(dataset=dataset_id, dax_string=dax_query, workspace=workspace_id)
        if df.empty:
            raise ValueError("Semantic model query returned no results")
        
        column_name = df.columns[0]
        values = [str(v) for v in df[column_name].tolist() if v is not None and str(v).strip()]
        logger.info(f"‚úì Loaded {len(values)} values from Semantic Model")
        return values
    
    def _load_from_lakehouse(self, table: str, category: str, column: str, filter_clause: str = "", **kwargs) -> List[str]:
        """Load from Lakehouse Delta table via Spark SQL"""
        if self.spark is None:
            raise Exception("Spark session not available for lakehouse source")
        if not self.validator.is_valid_sql_identifier(table):
            raise ValueError(f"Invalid table name: {table}")
        if not self.validator.is_valid_sql_identifier(column):
            raise ValueError(f"Invalid column name: {column}")
        
        safe_category = self.validator.sanitize_sql_string(category)
        logger.info(f"üìä Executing Lakehouse query...")
        
        query = f"SELECT {column} FROM {table} WHERE IsActive = true AND Category = '{safe_category}'"
        if filter_clause:
            dangerous_patterns = [';', '--', '/*', '*/', 'xp_', 'sp_', 'DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE']
            if any(pattern.lower() in filter_clause.lower() for pattern in dangerous_patterns):
                raise ValueError(f"Filter clause contains potentially dangerous SQL keywords")
            query += f" AND ({filter_clause})"
        query += " ORDER BY SortOrder, ParameterName"
        
        df = self.spark.sql(query)
        values = [str(row[0]) for row in df.collect() if row[0] is not None]
        logger.info(f"‚úì Loaded {len(values)} values from Lakehouse")
        return values
    
    def _load_from_json(self, json_values: str, **kwargs) -> List[str]:
        """Load from JSON array"""
        logger.info(f"üìä Loading from JSON array...")
        if not json_values or json_values.strip() == "[]":
            raise ValueError("JSON parameter values cannot be empty")
        
        try:
            values = json.loads(json_values) if isinstance(json_values, str) else json_values
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON format: {str(e)}")
        
        if not isinstance(values, list) or len(values) == 0:
            raise ValueError("JSON parameter values must be a non-empty array")
        
        if len(values) > 1000:
            logger.warning(f"‚ö† JSON array has {len(values)} values. Consider using Lakehouse or Warehouse for large lists.")
        
        values = [str(v) for v in values if v is not None and str(v).strip()]
        logger.info(f"‚úì Loaded {len(values)} values from JSON")
        return values
    
    def _load_from_warehouse(self, warehouse_name: str, table: str, column: str, category: str = "", **kwargs) -> List[str]:
        """Load from Warehouse via SQL endpoint"""
        import pyodbc
        
        if not warehouse_name or len(warehouse_name) > 128:
            raise ValueError("Invalid warehouse name")
        if not self.validator.is_valid_sql_identifier(table):
            raise ValueError(f"Invalid table name: {table}")
        if not self.validator.is_valid_sql_identifier(column):
            raise ValueError(f"Invalid column name: {column}")
        
        logger.info(f"üìä Querying Warehouse...")
        conn = None
        cursor = None
        
        try:
            token = self.mssparkutils.credentials.getToken("sql")
            token_bytes = token.encode("UTF-16-LE")
            token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes)
            SQL_COPT_SS_ACCESS_TOKEN = 1256
            
            workspace_name = self.mssparkutils.runtime.context.get('workspaceName')
            if not workspace_name:
                raise ValueError("Could not determine workspace name from context")
            
            conn_str = f"Driver={{ODBC Driver 18 for SQL Server}};Server={workspace_name}.datawarehouse.fabric.microsoft.com;Database={warehouse_name};Encrypt=yes;TrustServerCertificate=no;"
            conn = pyodbc.connect(conn_str, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct}, timeout=30)
            cursor = conn.cursor()
            
            safe_category = self.validator.sanitize_sql_string(category)
            query = f"SELECT {column} FROM {table} WHERE IsActive = 1"
            if category:
                query += f" AND Category = '{safe_category}'"
            query += " ORDER BY SortOrder"
            
            cursor.execute(query)
            values = [str(row[0]) for row in cursor.fetchall() if row[0] is not None]
            logger.info(f"‚úì Loaded {len(values)} values from Warehouse")
            return values
            
        except Exception as e:
            raise Exception(f"Warehouse query failed: {str(e)[:500]}")
        finally:
            if cursor:
                try:
                    cursor.close()
                except:
                    pass
            if conn:
                try:
                    conn.close()
                except:
                    pass


# ============================================================================
# POWER BI API CLIENT CLASS
# ============================================================================

class PowerBIAPIClient:
    """Power BI REST API interactions with retry logic"""
    
    def __init__(self, token_manager: TokenManager, config: Dict):
        """Initialize API client"""
        self.token_manager = token_manager
        self.workspace_id = config['workspace_id']
        self.report_id = config['report_id']
        self.output_format = config['output_format']
        self.connection_timeout = config['connection_timeout']
        self.poll_interval = config['poll_interval']
        self.export_timeout = config['export_timeout']
        self.download_chunk_size_mb = config['download_chunk_size_mb']
        self.file_size_warning_mb = config['file_size_warning_mb']
        self.download_timeout = config['download_timeout']
    
    def _handle_api_response(self, response: requests.Response, operation: str) -> None:
        """Handle API response with proper error handling and rate limiting"""
        if response.status_code == 429:
            retry_after = int(response.headers.get('Retry-After', '60'))
            logger.warning(f"‚ö† API rate limit hit during {operation}")
            logger.info(f"  Waiting {retry_after} seconds before retry...")
            time.sleep(retry_after)
            raise Exception(f"Rate limited: {operation}. Retry after {retry_after}s")
        elif response.status_code >= 400:
            error_msg = f"API error during {operation}: HTTP {response.status_code}"
            try:
                error_detail = response.json().get('error', {}).get('message', response.text[:200])
                error_msg += f" - {error_detail}"
            except:
                error_msg += f" - {response.text[:200]}"
            raise Exception(error_msg)
    
    def initiate_export(self, parameters: Dict[str, Any]) -> str:
        """Initiate paginated report export via Power BI REST API"""
        export_url = f"https://api.powerbi.com/v1.0/myorg/groups/{self.workspace_id}/reports/{self.report_id}/ExportTo"
        headers = self.token_manager.get_headers()
        body = {
            "format": self.output_format,
            "paginatedReportConfiguration": {
                "parameterValues": [{"name": k, "value": str(v)} for k, v in parameters.items()]
            }
        }
        response = requests.post(export_url, headers=headers, json=body, timeout=self.connection_timeout)
        self._handle_api_response(response, "initiate export")
        return response.json()['id']
    
    def poll_status(self, export_id: str) -> bool:
        """Poll export status until completion or timeout"""
        status_url = f"https://api.powerbi.com/v1.0/myorg/groups/{self.workspace_id}/reports/{self.report_id}/exports/{export_id}"
        headers = self.token_manager.get_headers()
        start_time = time.time()
        poll_count = 0
        
        while time.time() - start_time < self.export_timeout:
            poll_count += 1
            try:
                response = requests.get(status_url, headers=headers, timeout=self.connection_timeout)
                self._handle_api_response(response, "poll export status")
                status_data = response.json()
                status = status_data.get('status')
                
                if status == 'Succeeded':
                    logger.debug(f"Export succeeded after {poll_count} polls ({time.time() - start_time:.1f}s)")
                    return True
                elif status == 'Failed':
                    error = status_data.get('error', {}).get('message', 'Unknown error')
                    raise Exception(f"Export failed: {error}")
                elif status in ['Running', 'NotStarted']:
                    time.sleep(self.poll_interval)
                else:
                    logger.warning(f"‚ö† Unknown export status: {status}")
                    time.sleep(self.poll_interval)
            except Exception as e:
                if "Rate limited" in str(e):
                    raise
                logger.warning(f"‚ö† Error during status poll {poll_count}: {str(e)[:200]}")
                time.sleep(self.poll_interval)
        
        raise TimeoutError(f"Export timeout after {self.export_timeout} seconds ({poll_count} polls)")
    
    def download_file(self, export_id: str) -> bytes:
        """Download exported report file"""
        file_url = f"https://api.powerbi.com/v1.0/myorg/groups/{self.workspace_id}/reports/{self.report_id}/exports/{export_id}/file"
        headers = self.token_manager.get_headers()
        response = requests.get(file_url, headers=headers, stream=True, timeout=self.download_timeout)
        self._handle_api_response(response, "download report file")
        
        content = b''
        chunk_size_bytes = self.download_chunk_size_mb * 1024 * 1024
        file_size_warning_bytes = self.file_size_warning_mb * 1024 * 1024
        
        for chunk in response.iter_content(chunk_size=chunk_size_bytes):
            if chunk:
                content += chunk
                if len(content) > file_size_warning_bytes:
                    logger.warning(f"‚ö† Report file exceeds {self.file_size_warning_mb}MB, may cause memory issues")
        
        return content


# ============================================================================
# ONELAKE STORAGE CLASS
# ============================================================================

class OneLakeStorage:
    """File storage operations with sanitization and date-based hierarchy"""
    
    def __init__(self, mssparkutils, config: Dict):
        """Initialize storage handler"""
        self.mssparkutils = mssparkutils
        self.output_format = config['output_format']
        self.special_param_name = config['special_param_name']
        self.file_size_warning_mb = config['file_size_warning_mb']
    
    def _sanitize_filename(self, text: str, max_length: int = 200) -> str:
        """Sanitize text for use in filename with Unicode support"""
        if not text:
            return "unnamed"
        
        text = str(text)
        text = unicodedata.normalize('NFKD', text)
        text = text.encode('ascii', 'ignore').decode('ascii')
        text = re.sub(r'[^\w\s-]', '', text)
        text = re.sub(r'\s+', ' ', text)
        text = text.replace(' ', '_')
        text = text.strip('_-')
        
        if not text:
            text = "unnamed"
        if len(text) > max_length:
            text = text[:max_length]
        
        return text
    
    def _generate_file_name(self, special_value: str, timestamp: datetime = None) -> str:
        """Generate descriptive filename with timezone-aware timestamp
        
        Format: Report_{ParamName}_{SanitizedValue}_{Timestamp}.{ext}
        Example: Report_Producer_AcmeCorp_20250130_143022.pdf
        """
        if timestamp is None:
            timestamp = datetime.now(timezone.utc)
        elif timestamp.tzinfo is None:
            timestamp = timestamp.replace(tzinfo=timezone.utc)
        
        sanitized_value = self._sanitize_filename(special_value, max_length=100)
        sanitized_param = self._sanitize_filename(self.special_param_name, max_length=50)
        timestamp_str = timestamp.strftime('%Y%m%d_%H%M%S')
        extension = self.output_format.lower()
        
        filename = f"Report_{sanitized_param}_{sanitized_value}_{timestamp_str}.{extension}"
        
        # Final safety check
        if len(filename) > 255:
            sanitized_value = self._sanitize_filename(special_value, max_length=50)
            filename = f"Report_{sanitized_param}_{sanitized_value}_{timestamp_str}.{extension}"
        
        return filename
    
    def save(self, file_content: bytes, special_value: str) -> str:
        """Save binary file to OneLake for archival with date-based organization
        
        This function properly handles binary content (PDF, XLSX, etc.) by:
        1. Writing to a temporary file first
        2. Copying to OneLake using mssparkutils.fs.cp()
        3. Cleaning up the temporary file
        
        Note: Requires a Lakehouse to be attached to the notebook as a data item.
        """
        timestamp = datetime.now(timezone.utc)
        date_path = timestamp.strftime('%Y/%m/%d')
        filename = self._generate_file_name(special_value, timestamp)
        onelake_path = f"Files/reports/archive/{date_path}/{filename}"
        
        file_size_mb = len(file_content) / (1024 * 1024)
        if file_size_mb > self.file_size_warning_mb:
            logger.warning(f"‚ö† File size is {file_size_mb:.1f}MB, which is very large")
        
        temp_file = None
        temp_path = None
        try:
            # Create a temporary file to write binary content
            with tempfile.NamedTemporaryFile(delete=False, suffix=f".{self.output_format.lower()}") as temp_file:
                temp_file.write(file_content)
                temp_path = temp_file.name
            
            # Copy from temp file to OneLake using file:// protocol
            temp_url = f"file://{temp_path}"
            
            # Delete existing file if it exists (since cp doesn't support overwrite parameter)
            try:
                self.mssparkutils.fs.rm(onelake_path)
            except Exception:
                pass  # File doesn't exist, that's fine
            
            # Use mssparkutils.fs.cp to copy the file to OneLake
            self.mssparkutils.fs.cp(temp_url, onelake_path)
            
            logger.debug(f"  Saved {file_size_mb:.2f}MB to OneLake")
            
        except Exception as e:
            error_msg = str(e)
            # Don't include binary content in error message
            if len(error_msg) > 200:
                error_msg = error_msg[:200]
            raise Exception(f"Failed to write to OneLake: {error_msg}")
        finally:
            # Clean up temporary file
            if temp_path and os.path.exists(temp_path):
                try:
                    os.unlink(temp_path)
                except Exception:
                    pass  # Ignore cleanup errors
        
        return onelake_path


# ============================================================================
# PAGINATED REPORT EXECUTOR CLASS (Main Orchestrator)
# ============================================================================

class PaginatedReportExecutor:
    """Main orchestrator for paginated report batch execution using composition"""
    
    def __init__(self, config: Dict, mssparkutils, spark=None):
        """Initialize executor and all components"""
        self.config = config
        self.mssparkutils = mssparkutils
        self.spark = spark
        
        # Initialize logger
        logger.info("\n" + "="*60)
        logger.info("INITIALIZING PAGINATED REPORT EXECUTOR")
        logger.info("="*60 + "\n")
        
        # Compose helper objects
        self.validator = InputValidator()
        self.token_manager = TokenManager(
            mssparkutils, 
            refresh_interval_minutes=config['token_refresh_interval']
        )
        self.param_loader = ParameterLoader(
            mssparkutils, 
            spark,
            max_retries=config['param_loader_max_retries'],
            retry_delay=config['param_loader_retry_delay']
        )
        self.api_client = PowerBIAPIClient(self.token_manager, config)
        
        # Storage is optional (only if archiving enabled)
        if config['archive_to_onelake']:
            self.storage = OneLakeStorage(mssparkutils, config)
        else:
            self.storage = None
        
        # Validate all parameters
        self._validate_all_parameters()
        
        # Log configuration
        self._log_configuration()
        
        logger.info("‚úì Executor initialized successfully\n")
    
    def _validate_all_parameters(self):
        """Validate all configuration parameters"""
        if not self.validator.is_valid_guid(self.config['workspace_id']):
            raise ValueError(f"Invalid workspace_id format. Must be a valid GUID. Received: '{self.config['workspace_id']}'")
        if not self.validator.is_valid_guid(self.config['report_id']):
            raise ValueError(f"Invalid report_id format. Must be a valid GUID. Received: '{self.config['report_id']}'")
        if not self.validator.is_valid_format(self.config['output_format']):
            raise ValueError(f"Invalid output_format: {self.config['output_format']}. Must be one of: PDF, XLSX, DOCX, PPTX, PNG")
        if not self.validator.is_valid_source_type(self.config['special_values_source']):
            raise ValueError(f"Invalid special_values_source: {self.config['special_values_source']}. Must be one of: semantic_model, lakehouse, json, warehouse")
        
        # Validate source-specific parameters based on selected source
        logger.info(f"üìã Validating configuration for source type: {self.config['special_values_source']}")
        
        source_type = self.config['special_values_source']
        if source_type == "semantic_model":
            source_config = self.config['semantic_model']
            if not self.validator.is_valid_guid(source_config['workspace_id']):
                raise ValueError(f"Invalid semantic_model workspace_id format. Required for semantic_model source.")
            if not self.validator.is_valid_guid(source_config['dataset_id']):
                raise ValueError(f"Invalid semantic_model dataset_id format. Required for semantic_model source.")
            if not source_config['dax_query']:
                raise ValueError("semantic_model_dax_query is required for semantic_model source")
            logger.info(f"‚úì Semantic Model configuration validated")
        
        elif source_type == "lakehouse":
            source_config = self.config['lakehouse']
            if not source_config['table']:
                raise ValueError("lakehouse_table is required for lakehouse source")
            if not source_config['category']:
                raise ValueError("lakehouse_category is required for lakehouse source")
            if not source_config['column']:
                raise ValueError("lakehouse_column is required for lakehouse source")
            logger.info(f"‚úì Lakehouse configuration validated")
        
        elif source_type == "json":
            source_config = self.config['json']
            if not source_config['json_values'] or source_config['json_values'].strip() == "[]":
                raise ValueError("special_parameter_values is required for json source and cannot be empty")
            logger.info(f"‚úì JSON configuration validated")
        
        elif source_type == "warehouse":
            source_config = self.config['warehouse']
            if not source_config['warehouse_name']:
                raise ValueError("warehouse_name is required for warehouse source")
            if not source_config['table']:
                raise ValueError("warehouse_table is required for warehouse source")
            if not source_config['column']:
                raise ValueError("warehouse_column is required for warehouse source")
            logger.info(f"‚úì Warehouse configuration validated")
        
        # Validate static parameters
        if not isinstance(self.config['static_params'], dict):
            raise ValueError("static_params must be a dictionary")
        logger.info(f"‚úì Static parameters validated: {len(self.config['static_params'])} parameter(s)")
        
        logger.info("‚úì All configuration validated successfully")
    
    def _log_configuration(self):
        """Log startup configuration"""
        logger.info("Configuration:")
        logger.info(f"  Report ID: {self.config['report_id'][:8] if self.config['report_id'] else 'Not set'}...")
        logger.info(f"  Special parameter: {self.config['special_param_name']}")
        logger.info(f"  Source: {self.config['special_values_source']}")
        logger.info(f"  Output format: {self.config['output_format']}")
        logger.info(f"  Archive to OneLake: {self.config['archive_to_onelake']}")
        logger.info(f"  Export timeout: {self.config['export_timeout']}s")
        logger.info(f"  Max retries: {self.config['max_retries']}")
        logger.info(f"  Token refresh interval: {self.config['token_refresh_interval']} minutes")
    
    def _load_special_values(self) -> List[str]:
        """Load special parameter values from configured source"""
        logger.info("\n" + "="*60)
        logger.info("LOADING SPECIAL PARAMETER VALUES")
        logger.info("="*60 + "\n")
        
        source_type = self.config['special_values_source']
        source_config = self.config[source_type]
        
        try:
            special_values = self.param_loader.load(source_type, **source_config)
        except Exception as e:
            logger.error(f"‚ùå Failed to load parameters: {str(e)[:500]}")
            logger.error(f"   Source: {source_type}")
            logger.error(f"   Check your configuration and ensure:")
            logger.error(f"   - Data source exists and is accessible")
            logger.error(f"   - Permissions are granted")
            logger.error(f"   - Parameters are correctly formatted")
            raise
        
        logger.info(f"\n{'='*60}")
        logger.info(f"Parameter '{self.config['special_param_name']}' loaded: {len(special_values)} unique values")
        if len(special_values) <= 10:
            logger.info(f"Values: {special_values}")
        else:
            logger.info(f"First 10 values: {special_values[:10]}")
            logger.info(f"... and {len(special_values) - 10} more")
        logger.info(f"{'='*60}\n")
        
        # Validation
        if not special_values or len(special_values) == 0:
            raise ValueError(
                f"‚ùå No parameter values loaded from {source_type}! "
                f"Check your configuration:\n"
                f"  - Ensure the data source has data\n"
                f"  - Verify category/filter settings\n"
                f"  - Check permissions"
            )
        
        # Warnings for large batches
        if len(special_values) > 100:
            estimated_minutes = len(special_values) * 2
            logger.warning(f"‚ö† Processing {len(special_values)} values may take a long time")
            logger.warning(f"  Estimated time: {estimated_minutes} minutes (assuming 2 min per report)")
            logger.warning(f"  Token will auto-refresh every {self.config['token_refresh_interval']} minutes")
        
        if len(special_values) > 500:
            logger.warning(f"‚ö† Very large batch detected! This may take hours to complete.")
            logger.warning(f"  Recommendation: Use multiple pipelines to process in parallel")
        
        logger.info("‚úì Special parameter values loaded, deduplicated, and validated")
        
        return special_values
    
    def _execute_single_report(self, params: Dict, special_value: str) -> Dict:
        """Execute report for a single parameter value with retry logic"""
        start_time = datetime.now(timezone.utc)
        
        for attempt in range(self.config['max_retries']):
            try:
                # Ensure token is valid (refreshes if needed)
                self.token_manager.ensure_valid_token()
                
                # Step 1: Initiate export
                logger.info(f"  Step 1/4: Initiating report export...")
                export_id = self.api_client.initiate_export(params)
                logger.info(f"    ‚úì Export initiated. Export ID: {export_id[:8]}...")
                
                # Step 2: Poll for completion
                logger.info(f"  Step 2/4: Waiting for export to complete...")
                self.api_client.poll_status(export_id)
                logger.info(f"    ‚úì Export completed successfully")
                
                # Step 3: Download file
                logger.info(f"  Step 3/4: Downloading report file...")
                file_content = self.api_client.download_file(export_id)
                file_size_mb = len(file_content) / (1024 * 1024)
                logger.info(f"    ‚úì Downloaded {file_size_mb:.2f} MB")
                
                # Step 4: Save to OneLake (if enabled)
                onelake_path = None
                if self.storage:
                    logger.info(f"  Step 4/4: Saving to OneLake archive...")
                    onelake_path = self.storage.save(file_content, special_value)
                    logger.info(f"    ‚úì Saved to: {onelake_path}")
                
                # Success!
                end_time = datetime.now(timezone.utc)
                duration = (end_time - start_time).total_seconds()
                
                return {
                    'special_value': special_value,
                    'status': 'SUCCESS',
                    'onelake_path': onelake_path,
                    'file_size_mb': round(file_size_mb, 2),
                    'duration_seconds': round(duration, 2),
                    'timestamp': end_time.isoformat(),
                    'attempts': attempt + 1,
                    'error': None
                }
            
            except Exception as e:
                error_msg = str(e)[:500]
                
                if attempt < self.config['max_retries'] - 1:
                    wait_time = self.config['retry_backoff_base'] * (2 ** attempt)
                    logger.warning(f"  ‚úó Attempt {attempt + 1}/{self.config['max_retries']} failed: {error_msg}")
                    logger.info(f"  ‚Üí Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    # All retries exhausted
                    end_time = datetime.now(timezone.utc)
                    duration = (end_time - start_time).total_seconds()
                    logger.error(f"  ‚úó All {self.config['max_retries']} attempts failed for '{special_value}'")
                    logger.error(f"     Final error: {error_msg}")
                    
                    return {
                        'special_value': special_value,
                        'status': 'FAILED',
                        'onelake_path': None,
                        'file_size_mb': 0,
                        'duration_seconds': round(duration, 2),
                        'timestamp': end_time.isoformat(),
                        'attempts': self.config['max_retries'],
                        'error': error_msg
                    }
        
        # Should never reach here
        return {
            'special_value': special_value,
            'status': 'FAILED',
            'error': 'Unknown error - retry loop completed unexpectedly',
            'attempts': self.config['max_retries']
        }
    
    def _generate_summary(self, results: List[Dict], total_duration: float) -> Dict:
        """Generate execution summary and pipeline result JSON"""
        success_count = len([r for r in results if r['status'] == 'SUCCESS'])
        fail_count = len(results) - success_count
        total_size_mb = sum(r.get('file_size_mb', 0) for r in results)
        avg_duration = sum(r.get('duration_seconds', 0) for r in results) / len(results) if results else 0
        total_attempts = sum(r.get('attempts', 1) for r in results)
        
        logger.info(f"\n{'='*60}")
        logger.info("EXECUTION SUMMARY")
        logger.info(f"{'='*60}")
        logger.info(f"Total reports processed: {len(results)}")
        logger.info(f"‚úì Successful: {success_count}")
        logger.info(f"‚úó Failed: {fail_count}")
        logger.info(f"Total size: {total_size_mb:.2f} MB")
        logger.info(f"Average duration per report: {avg_duration:.1f} seconds")
        logger.info(f"Success rate: {(success_count/len(results)*100):.1f}%")
        logger.info(f"Total retry attempts: {total_attempts} (avg {total_attempts/len(results):.1f} per report)")
        
        # Print successful files
        if success_count > 0:
            logger.info(f"\n{'='*60}")
            logger.info(f"GENERATED FILES (Saved to OneLake): {success_count} files")
            logger.info(f"{'='*60}")
            # Show first 20, then summarize
            for idx, r in enumerate([r for r in results if r['status'] == 'SUCCESS'][:20], 1):
                logger.info(f"  {idx}. {r['onelake_path']} ({r['file_size_mb']} MB)")
            if success_count > 20:
                logger.info(f"  ... and {success_count - 20} more files")
        
        # Print failures
        if fail_count > 0:
            logger.info(f"\n{'='*60}")
            logger.error(f"FAILURES: {fail_count} reports failed")
            logger.info(f"{'='*60}")
            for idx, r in enumerate([r for r in results if r['status'] == 'FAILED'], 1):
                # Truncate error message for readability
                error_msg = r.get('error', 'Unknown error')
                if len(error_msg) > 200:
                    error_msg = error_msg[:200] + "..."
                logger.error(f"  {idx}. '{r['special_value']}': {error_msg}")
        
        completion_time = datetime.now(timezone.utc)
        logger.info(f"\n{'='*60}")
        logger.info(f"Execution completed at: {completion_time.strftime('%Y-%m-%d %H:%M:%S UTC')}")
        logger.info(f"{'='*60}\n")
        
        # Prepare file list for pipeline consumption
        successful_files = [
            r['onelake_path'] 
            for r in results 
            if r['status'] == 'SUCCESS' and r.get('onelake_path')
        ]
        
        # Prepare result for pipeline (must match pipeline's expected JSON structure)
        pipeline_result = {
            'files': successful_files,
            'status': 'success' if fail_count == 0 else 'partial_success' if success_count > 0 else 'failed',
            'total': len(results),
            'success_count': success_count,
            'fail_count': fail_count,
            'total_size_mb': round(total_size_mb, 2),
            'avg_duration_seconds': round(avg_duration, 2),
            'total_duration_seconds': round(total_duration, 2),
            'errors': [
                {'value': r['special_value'], 'error': r['error'][:200] if r.get('error') else 'Unknown'} 
                for r in results if r['status'] == 'FAILED'
            ][:50],  # Limit to first 50 errors to avoid huge JSON
            'timestamp': completion_time.isoformat(),
            'parameter_name': self.config['special_param_name'],
            'source_type': self.config['special_values_source']
        }
        
        logger.info(f"üì§ Returning result to pipeline:")
        logger.info(f"   Status: {pipeline_result['status']}")
        logger.info(f"   Files: {len(successful_files)} paths")
        logger.info(f"   Success: {success_count}/{len(results)}")
        
        return pipeline_result
    
    def execute_batch(self) -> Dict:
        """Main execution method - PUBLIC API"""
        logger.info("\n" + "="*60)
        logger.info("STARTING REPORT BATCH EXECUTION")
        logger.info("="*60 + "\n")
        sys.stdout.flush()
        
        execution_start_time = datetime.now(timezone.utc)
        
        print(f"\n‚è± Execution started at: {execution_start_time}")
        sys.stdout.flush()
        
        # Load special parameter values
        special_values = self._load_special_values()
        
        print(f"üìä Total items to process: {len(special_values)}")
        print(f"üîÑ Starting loop...")
        sys.stdout.flush()
        
        # Execute batch loop
        results = []
        for idx, special_value in enumerate(special_values, 1):
            # IMMEDIATE FEEDBACK - Print to stdout AND logger
            print(f"\n{'='*60}")
            print(f"‚ñ∂ PROCESSING {idx}/{len(special_values)}: {self.config['special_param_name']} = '{special_value}'")
            print(f"{'='*60}")
            sys.stdout.flush()
            
            logger.info(f"\n{'='*60}")
            logger.info(f"Processing {idx}/{len(special_values)}: {self.config['special_param_name']} = '{special_value}'")
            logger.info(f"{'='*60}\n")
            sys.stdout.flush()
            
            # Merge static parameters with current special value
            print(f"  ‚öô Merging parameters...")
            sys.stdout.flush()
            
            all_params = self.config['static_params'].copy()
            all_params[self.config['special_param_name']] = special_value
            
            print(f"  ‚úì Parameters merged")
            logger.info(f"  Parameters:")
            for key, value in all_params.items():
                value_str = str(value)
                if len(value_str) > 100:
                    value_str = value_str[:100] + "..."
                logger.info(f"    {key}: {value_str}")
            logger.info("")
            sys.stdout.flush()
            
            # Execute report with retry logic
            print(f"  üöÄ Calling execute_report_with_retry...")
            sys.stdout.flush()
            
            result = self._execute_single_report(all_params, special_value)
            
            print(f"  ‚úì execute_report_with_retry returned")
            sys.stdout.flush()
            
            results.append(result)
            
            # Print result summary with immediate flush
            if result['status'] == 'SUCCESS':
                print(f"  ‚úÖ SUCCESS ({result['duration_seconds']}s, {result['attempts']} attempt(s))")
                sys.stdout.flush()
                logger.info(f"\n  ‚úÖ SUCCESS ({result['duration_seconds']}s, {result['attempts']} attempt(s))")
                logger.info(f"     OneLake: {result['onelake_path']}")
                logger.info(f"     Size: {result['file_size_mb']} MB")
            else:
                print(f"  ‚ùå FAILED ({result['duration_seconds']}s, {result['attempts']} attempt(s))")
                print(f"     Error: {result['error']}")
                sys.stdout.flush()
                logger.error(f"\n  ‚ùå FAILED ({result['duration_seconds']}s, {result['attempts']} attempt(s))")
                logger.error(f"     Error: {result['error']}")
            
            sys.stdout.flush()
            
            # Progress update for large batches
            if len(special_values) > 20 and idx % 10 == 0:
                success_so_far = len([r for r in results if r['status'] == 'SUCCESS'])
                pct_complete = (idx / len(special_values)) * 100
                print(f"\n  üìä Progress: {idx}/{len(special_values)} ({pct_complete:.1f}%) - {success_so_far} successful")
                sys.stdout.flush()
                logger.info(f"\n  üìä Progress: {idx}/{len(special_values)} ({pct_complete:.1f}%) - {success_so_far} successful")
        
        print("\n" + "=" * 60)
        print("LOOP COMPLETED")
        print("=" * 60)
        sys.stdout.flush()
        
        execution_end_time = datetime.now(timezone.utc)
        total_duration = (execution_end_time - execution_start_time).total_seconds()
        
        logger.info(f"\n{'='*60}")
        logger.info("EXECUTION COMPLETE")
        logger.info(f"{'='*60}")
        logger.info(f"Total time: {total_duration:.1f} seconds ({total_duration/60:.1f} minutes)")
        logger.info(f"Average per report: {total_duration/len(results):.1f} seconds")
        sys.stdout.flush()
        
        print(f"\n‚úÖ BATCH EXECUTION COMPLETED SUCCESSFULLY")
        print(f"   Processed: {len(results)} reports")
        print(f"   Duration: {total_duration:.1f} seconds")
        sys.stdout.flush()
        
        # Generate and return summary
        return self._generate_summary(results, total_duration)


# ============================================================================
# MAIN EXECUTION
# ============================================================================

if __name__ == "__main__":
    logger.info(f"\n{'='*60}")
    logger.info("PAGINATED REPORT BATCH EXECUTOR v1.0")
    logger.info(f"Started at: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
    logger.info(f"{'='*60}\n")
    
    # Create executor with all configuration
    executor = PaginatedReportExecutor(
        config=config,
        mssparkutils=mssparkutils,
        spark=spark
    )
    
    # Execute batch
    result = executor.execute_batch()
    
    # Exit for pipeline integration
    logger.info("\nüì§ Exiting notebook with result for pipeline...")
    mssparkutils.notebook.exit(json.dumps(result))
    
    logger.info("‚úì Notebook completed successfully")