##### **This notebook will implement a basic framework for an incremental update in a dataflow where the standard incremental update does not work because of limitations in the datasource with query folding and bucket size limits.**

The way this works is as below:
1. This notebook activity is configured in a pipeline with the following base parameters of the notebook 
- **dataflow_id** = id of the dataflow
- **workspace_id** = id of the workspace
- **dataflow_name** = name of the dataflow
- **initial_load_from_date** = historical data requirements for this dataflow. The first load will extract data starting from this date.
- **bucket_size_in_days** = bucket size for each load in days. The first load will load the history in these bucket sizes to limit the data.
- **reinitialize_dataflow** = refresh the entire data and ignore any previous loads.
- **incrementally_update_last_n_days** = incrementally update the last n days every day, default is 1
- **destination_table** = destination table in the warehouse where the data is written
- **incremental_update_column** = column in the destination table that is used for the incremental update

2. Following parameters should be configured inside this notebook which are defined as constants:
    - SCHEMA = "[xxx].[dbo]" - This is the schema of the warehouse where the incremental update table resides
    - INCREMENTAL_TABLE = "[Incremental Update]" - Name of the incremental update table
    - CONNECTION_ARTIFACT = "xxx" - Name of the artifact for e.g. the warehouse name
    - CONNECTION_ARTIFACT_ID = "xxx" - Technical ID of the artifact for e.g. the warehouse id
    - CONNECTION_ARTIFACT_TYPE = "xxx" - Type of articfact for e.g. Warehouse

3. Based on the configured parameters, this notebook reads the Incremental Update table in the warehouse for the dataflow_id to determine if there was a previous refresh. If there was no previous refresh it assumes that this is an initial refresh and proceeds with loading the history. If there was a previous refresh then it reads the last range_start and range_end datetime parameters used for the refresh and the status of the last refresh. If the incremental update meta data table does not exist on the database it creates this table and inserts the entry for the dataflow id.

4. If this is the initial load, then the notebook calculates the first load range_start and range_end parameters based on the initial_load_from_date and bucket_size_in_days, and updates these values in the Incremental Update table in the warehouse, and sets the status of the load as Running. It repeats the process till all the history is refreshed till yesterday.

5. If this is the next incremental load, the notebook calculates the range_start and range_end based on the status of the last refresh, the range_start and range_end of the last load, the incrementally_update_last_n_days and bucket_size_in_days parameters.

6. The dataflow activity will read these range_start and range_end parameters from the Incremental Update table in the warehouse and refresh then execute the power query to refresh the data accordingly to the destination.


In [None]:
# Parameters passed from the pipeline as base parameters of the notebook activity
# Workspace id
workspace_id = ''

# Dataflow id
dataflow_id = ''

# Dataflow name
dataflow_name = ''

# Initial load from date for the first load
initial_load_from_date = ''

# Bucket size for each load
bucket_size_in_days = 1

# Reinitialize dataflow
reinitialize_dataflow = False

# Incrementally update last n days
incrementally_update_last_n_days = 1

# Destination table
destination_table = ''

# Incremental update column
incremental_update_column = ''

In [None]:
from datetime import datetime, timedelta
import time
import logging
import pandas as pd
import sempy.fabric as fabric
from typing import Optional, Dict, Any, Union, Tuple, List

# Constants
SCHEMA = "[<your Warehouse or Lakehouse name>].[dbo]""
INCREMENTAL_TABLE = "[Incremental Update]"
CONNECTION_ARTIFACT = "<your Warehouse or Lakehouse name>"
CONNECTION_ARTIFACT_ID = "<your Warehouse or Lakehouse id>"
CONNECTION_ARTIFACT_TYPE = "<Warehouse or Lakehouse>"

class DataflowRefresher:
    """
    Class to incrementally refresh a dataflow in Microsoft Fabric
    with support for initial loads and incremental updates
    """
    def __init__(self, client, artifact: str, artifact_id: str, artifact_type: str,
                 schema: str, incremental_table: str, log_level: int):
        """
        Initialize the DataflowRefresher
        
        Args:
            client: API client for Microsoft Fabric
            artifact: Name of the artifact to connect to (e.g., "The Beer Store")
            artifact_id: ID of the artifact (e.g., workspace ID)
            artifact_type: Type of the artifact (e.g., "Warehouse")
            schema: Database schema name including brackets, e.g. "[The Beer Store].[dbo]"
            incremental_table: Name of the table tracking incremental updates (without schema)
            log_level: level of logging
        """
        self.client = client
        self.artifact = artifact
        self.artifact_id = artifact_id
        self.artifact_type = artifact_type
        self.connection = None
        self.schema = schema
        self.incremental_table = f"{schema}.{incremental_table}"
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(log_level)
        self._ensure_connection()
        self.create_incremental()
    
    def _create_connection(self):
        """
        Create a new connection to the artifact
        
        Returns:
            A new database connection
        """
        try:
            import notebookutils.data
            return notebookutils.data.connect_to_artifact(
                self.artifact, self.artifact_id, self.artifact_type)
        except Exception as e:
            self.logger.error(f"Error creating connection: {e}")
            raise
    
    def _ensure_connection(self):
        """
        Ensure that we have a valid database connection, creating a new one if needed
        
        Returns:
            A valid database connection
        """
        if self.connection is None:
            self.connection = self._create_connection()
            return self.connection
            
        # Test if the existing connection is still valid
        try:
            cursor = self.connection.cursor()
            cursor.execute("SELECT 1")
            cursor.fetchall()
            cursor.close()
            return self.connection
        except Exception as e:
            self.logger.warning(f"Connection test failed, reconnecting: {e}")
            self._close_connection()
            self.connection = self._create_connection()
            return self.connection
    
    def _close_connection(self):
        """Close the current connection if it exists"""
        if self.connection is not None:
            try:
                self.connection.close()
            except Exception as e:
                self.logger.warning(f"Error closing connection: {e}")
            finally:
                self.connection = None
    
    def _execute_with_retry(self, sql, params=None, commit=True, max_retries=3):
        """
        Execute a SQL statement with automatic retry on connection issues
        
        Args:
            sql: SQL statement to execute
            params: Parameters for the SQL statement
            commit: Whether to commit the transaction
            max_retries: Maximum number of retries
            
        Returns:
            Database cursor
        """
        retries = 0
        last_error = None
        
        while retries < max_retries:
            try:
                # Ensure we have a valid connection
                self._ensure_connection()
                
                # Execute the SQL
                cursor = self.connection.execute(sql, params or ())
                
                # Commit if requested
                if commit:
                    self.connection.commit()
                    
                return cursor
                
            except Exception as e:
                last_error = e
                self.logger.warning(f"Database operation failed (attempt {retries+1}/{max_retries}): {e}")
                self._close_connection()  # Force reconnection on next attempt
                retries += 1
                
                # Small delay before retry
                if retries < max_retries:
                    time.sleep(1)
        
        # If we get here, we've exhausted retries
        self.logger.error(f"Database operation failed after {max_retries} attempts: {last_error}")
        raise last_error
    
    def create_incremental(self) -> bool:
        """
        Create the incremental update meta data table if it does not exist
        
        Args:
            None
            
        Returns:
            None
        """
        try:
            sql = f"""
                IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'Incremental Update' AND schema_id = SCHEMA_ID('dbo'))
                BEGIN
                CREATE TABLE {self.incremental_table}
                (
                    [dataflow_id] [varchar](60) NOT NULL,
                    [workspace_id] [VARCHAR](60) NOT NULL,
                    [dataflow_name] [varchar](60) NULL,
                    [initial_load_from_date] [datetime2](3) NOT NULL,
                    [bucket_size_in_days] [int] NOT NULL,
                    [incrementally_update_last_n_days] [int] NOT NULL,
                    [destination_table] [VARCHAR](60) NOT NULL,
                    [incremental_update_column] [VARCHAR](60) NOT NULL,
                    [update_time] [datetime2](3) NULL,
                    [status] [varchar](50) NOT NULL,
                    [range_start] [datetime2](3) NULL,
                    [range_end] [datetime2](3) NULL
                )
                END
            """
            self._execute_with_retry(sql)
            self.logger.info(f"Successfully created incremental update table in the database in case it did not exist")
            return True
            
        except Exception as e:
            self.logger.error(f"Error getting incremental refresh data: {e}")
            raise

    def get_incremental(self, dataflow_id: str) -> Optional[pd.DataFrame]:
        """
        Get the last refresh details from the tracking table
        
        Args:
            dataflow_id: ID of the dataflow
            
        Returns:
            DataFrame with the last refresh details or None if no records found
        """
        try:
            sql = f"""
                SELECT TOP (1) [dataflow_id],
                         [update_time],
                         [status],
                         [range_start],
                         [range_end]
                FROM {self.incremental_table}
                WHERE [dataflow_id] = ?
                ORDER BY [update_time] DESC
            """
            cursor = self._execute_with_retry(sql, (dataflow_id,), commit=False)
            columns = [column[0] for column in cursor.description]
            data = cursor.fetchall()
            data = [tuple(row) for row in data]
            
            if not data:
                self.logger.info(f"No previous refresh records found for dataflow {dataflow_id}")
                return None
                
            return pd.DataFrame(data, columns=columns)
            
        except Exception as e:
            self.logger.error(f"Error getting incremental refresh data: {e}")
            raise

    def insert_into_incremental(self, dataflow_id: str, workspace_id: str, dataflow_name: str, initial_load_from_date: str,
                            bucket_size_in_days: int, incrementally_update_last_n_days: int, destination_table: str,
                            incremental_update_column: str, status: str, 
                            range_start: datetime, range_end: datetime) -> bool:
        """
        Insert into incremental table in the warehouse
        
        Args:
            dataflow_id: ID of the dataflow
            workspace_id: ID of the workspace
            dataflow_name: Name or description of the dataflow
            initial_load_from_date: Initial load from date
            bucket_size_in_days: Bucket size of each refresh in days
            incrementally_update_last_n_days: Incremental refresh bucket size
            destination_table: Destination table in the warehouse where data is written
            incremental_update_column: Column of the destination table used for incremental update
            status: Status of the refresh operation
            range_start: Start of the refresh date range
            range_end: End of the refresh date range
            
        Returns:
            True if successful, False otherwise
        """
        try:
            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            start_formatted = range_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            end_formatted = range_end.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            
            sql = f"""
                INSERT INTO {self.incremental_table}
                (
                    [dataflow_id], 
                    [workspace_id],
                    [dataflow_name],
                    [initial_load_from_date],
                    [bucket_size_in_days],
                    [incrementally_update_last_n_days],
                    [destination_table],
                    [incremental_update_column],
                    [update_time],
                    [status],
                    [range_start],
                    [range_end]
                )
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """
            
            params = (dataflow_id, workspace_id, dataflow_name, initial_load_from_date, 
                     bucket_size_in_days, incrementally_update_last_n_days, destination_table, 
                     incremental_update_column, current_time, status, start_formatted, end_formatted)
            
            self._execute_with_retry(sql, params)
            self.logger.info(f"Successfully inserted record for dataflow {dataflow_id}")
            return True
            
        except Exception as e:
            self.logger.error(f"Error inserting into incremental table: {e}")
            raise

    def delete_incremental(self, dataflow_id: str) -> bool:
        """
        Delete the entry from the tracking table
        
        Args:
            dataflow_id: ID of the dataflow
            
        Returns:
            True if successful, False otherwise
        """
        try:
            sql = f"""
                DELETE
                FROM {self.incremental_table}
                WHERE [dataflow_id] = ?
            """
            self._execute_with_retry(sql, (dataflow_id,))
            self.logger.info(f"Successfully deleted old record for dataflow {dataflow_id}")
            return True
            
        except Exception as e:
            self.logger.error(f"Could not delete old record for dataflow: {e}")
            raise

    def update_incremental(self, dataflow_id: str, status: str, 
                        range_start: datetime, range_end: datetime) -> bool:
        """
        Update incremental table in the warehouse with the new parameters
        
        Args:
            dataflow_id: ID of the dataflow
            status: Status of the refresh operation
            range_start: Start of the refresh date range
            range_end: End of the refresh date range
            
        Returns:
            True if successful, False otherwise
        """
        try:
            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            start_formatted = range_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            end_formatted = range_end.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
            
            sql = f"""
                UPDATE {self.incremental_table}
                SET 
                [status] = ?,
                [update_time] = ?,
                [range_start] = ?,
                [range_end] = ?
                WHERE [dataflow_id] = ?
            """
            
            cursor = self._execute_with_retry(sql, (status, current_time, 
                                               start_formatted, end_formatted, dataflow_id))
            rows_affected = cursor.rowcount
            
            if rows_affected > 0:
                self.logger.info(f"Successfully updated record for dataflow {dataflow_id}")
                return True
            else:
                self.logger.warning(f"No records updated for dataflow {dataflow_id}, record may not exist")
                return False
                
        except Exception as e:
            self.logger.error(f"Error updating incremental table: {e}")
            raise

    def delete_data(self, table: str, column: str, 
                 range_start: Union[datetime, str], 
                 range_end: Union[datetime, str],
                 dataflow_id: Optional[str] = None) -> int:
        """
        Delete any overlapping data from the previous refreshes
        
        Args:
            table: Target table name
            column: Date column to use for filtering
            range_start: Start of the date range to delete (datetime or string)
            range_end: End of the date range to delete (datetime or string)
            dataflow_id: Optional ID of the dataflow for logging purposes
            
        Returns:
            Number of rows deleted
        """
        try:
            # Format dates if they're datetime objects
            start_formatted = range_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] if isinstance(range_start, datetime) else range_start
            end_formatted = range_end.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] if isinstance(range_end, datetime) else range_end
            
            sql = f"""
                DELETE FROM {self.schema}.[{table}]
                WHERE [{column}] BETWEEN ? AND ?
            """
            
            cursor = self._execute_with_retry(sql, (start_formatted, end_formatted))
            rows_deleted = cursor.rowcount
            
            log_id = f" for {dataflow_id}" if dataflow_id else ""
            self.logger.info(f"Successfully deleted {rows_deleted} overlapping records{log_id} in table {table} between {start_formatted} and {end_formatted}")
            
            return rows_deleted
            
        except Exception as e:
            self.logger.error(f"Error deleting overlapping records: {e}")
            raise

    def refresh_dataflow(self, workspace_id: str, dataflow_id: str) -> Dict[str, Any]:
        """
        Refresh dataflow and return the response
        
        Args:
            workspace_id: ID of the workspace
            dataflow_id: ID of the dataflow
            
        Returns:
            Dictionary containing the response from the refresh API
        """
        try:
            response = self.client.post(f"/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}/refreshes", json={"refreshRequest": "y"})
            self.logger.info(f"Successfully triggered refresh for dataflow {dataflow_id}")
            return response
        except Exception as e:
            self.logger.error(f"Error triggering refresh: {e}")
            raise

    def get_latest_refresh_status(self, workspace_id: str, dataflow_id: str) -> Optional[str]:
        """
        Get refresh status of a dataflow
        
        Args:
            workspace_id: ID of the workspace
            dataflow_id: ID of the dataflow
            
        Returns:
            Status of the latest refresh operation or None if not available
        """
        try:
            response = self.client.get(f"/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}/transactions")
            transactions = response.json()['value']
            if transactions and len(transactions) > 0:
                latest_transaction = transactions[0]
                status = latest_transaction.get("status", "Unknown")
                self.logger.info(f"Latest refresh status for dataflow {dataflow_id}: {status}")
                return status
            else:
                self.logger.warning(f"No refresh transactions found for dataflow {dataflow_id}")
                return None
        except Exception as e:
            self.logger.error(f"Error checking refresh status: {e}")
            raise
            
    def wait_for_refresh_completion(self, workspace_id: str, dataflow_id: str, 
                                 timeout_minutes: int = 60, 
                                 check_interval_seconds: int = 30) -> str:
        """
        Wait for a dataflow refresh to complete
        
        Args:
            workspace_id: ID of the workspace
            dataflow_id: ID of the dataflow
            timeout_minutes: Maximum wait time in minutes
            check_interval_seconds: Interval between status checks in seconds
            
        Returns:
            Final status of the refresh operation
        """
        self.logger.info(f"Waiting for dataflow {dataflow_id} refresh to complete (timeout: {timeout_minutes} minutes)")
        
        start_time = datetime.now()
        timeout = timedelta(minutes=timeout_minutes)
        
        while datetime.now() - start_time < timeout:
            # Close existing connection before potentially long wait
            # This is critical to avoid idle timeout issues
            self._close_connection()
            
            status = self.get_latest_refresh_status(workspace_id, dataflow_id)
            
            if status in ["Success", "Failed", "Cancelled"]:
                self.logger.info(f"Dataflow refresh completed with status: {status}")
                return status
                
            self.logger.info(f"Current status: {status}, checking again in {check_interval_seconds} seconds")
            time.sleep(check_interval_seconds)
            
        self.logger.warning(f"Refresh timeout reached after {timeout_minutes} minutes")
        return "Timeout"

    def _parse_date(self, date_str: str) -> datetime:
        """
        Parse a date string into a datetime object
        
        Args:
            date_str: Date string in various formats
            
        Returns:
            Datetime object
        """
        try:
            # Try different formats
            for fmt in ['%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S']:
                try:
                    return datetime.strptime(date_str, fmt)
                except ValueError:
                    continue
            
            # If all formats fail, raise exception
            raise ValueError(f"Unable to parse date string: {date_str}")
        except Exception as e:
            self.logger.error(f"Error parsing date: {e}")
            raise

    def _get_date_ranges(self, start_date: datetime, end_date: datetime, bucket_size_days: int) -> List[Tuple[datetime, datetime]]:
        """
        Split a date range into smaller buckets
        
        Args:
            start_date: Start date of the range
            end_date: End date of the range
            bucket_size_days: Size of each bucket in days
            
        Returns:
            List of (start_date, end_date) tuples for each bucket
        """
        date_ranges = []
        current_start = start_date
        
        while current_start < end_date:
            current_end = min(current_start + timedelta(days=bucket_size_days), end_date)
            
            # Set time to 23:59:59 for the end date of each bucket except the last one
            if current_end < end_date:
                current_end = datetime(current_end.year, current_end.month, current_end.day, 23, 59, 59)
                
            date_ranges.append((current_start, current_end))
            
            # Start the next bucket from the day after the current end
            current_start = current_end + timedelta(seconds=1)
        
        return date_ranges

    def execute_incremental_refresh(self, 
                               workspace_id: str, 
                               dataflow_id: str,
                               dataflow_name: str,
                               destination_table: str,
                               incremental_update_column: str,
                               initial_load_from_date: str = None,
                               bucket_size_in_days: int = 30,
                               reinitialize_dataflow: bool = False,
                               incrementally_update_last_n_days: int = None,
                               wait_for_completion: bool = True,
                               timeout_minutes: int = 120) -> Dict[str, Any]:
        """
        Execute a complete incremental refresh workflow following the specified logic
        
        Args:
            workspace_id: ID of the workspace
            dataflow_id: ID of the dataflow
            destination_table: Name of the destination table
            incremental_update_column: Column used for incremental updates
            initial_load_from_date: Start date for the initial load (required for first load)
            bucket_size_in_days: Size of each refresh bucket in days
            reinitialize_dataflow: Whether to reinitialize the dataflow
            incrementally_update_last_n_days: Number of days to update incrementally
            wait_for_completion: Whether to wait for refresh completion
            timeout_minutes: Timeout when waiting for completion
            
        Returns:
            Dictionary containing execution results and statistics
        """
        # Ensure we have a fresh connection at the start
        self._ensure_connection()
        
        start_time = datetime.now()
        self.logger.info(f"Starting incremental refresh for dataflow {dataflow_id}")
        
        # Get yesterday's date at 23:59:59 as the default end date
        yesterday = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=1)
        yesterday_end = datetime(yesterday.year, yesterday.month, yesterday.day, 23, 59, 59)
        
        # Initialize statistics
        stats = {
            "dataflow_id": dataflow_id,
            "start_time": start_time,
            "buckets_refreshed": 0,
            "successful_refreshes": 0,
            "failed_refreshes": 0,
            "current_status": "Started"
        }
        
        try:
            # Check if we need to reinitialize the dataflow
            if reinitialize_dataflow:
                self.logger.info(f"Reinitializing dataflow {dataflow_id} as requested")
                self.delete_incremental(dataflow_id)
                last_refresh_data = None
            else:
                # Get the last refresh record
                last_refresh_df = self.get_incremental(dataflow_id)
                last_refresh_data = last_refresh_df.iloc[0].to_dict() if last_refresh_df is not None else None
            
            # Case 1: No previous refresh or reinitializing dataflow
            if last_refresh_data is None:
                self.logger.info(f"No previous refresh found or reinitializing for dataflow {dataflow_id}")
                
                if not initial_load_from_date:
                    raise ValueError("initial_load_from_date is required for the first load")
                
                start_date = self._parse_date(initial_load_from_date)
                end_date = yesterday_end
                
                self.logger.info(f"Performing initial load from {start_date} to {end_date}")
                
                # Split the date range into buckets
                date_ranges = self._get_date_ranges(start_date, end_date, bucket_size_in_days)
                
                stats["date_ranges"] = len(date_ranges)
                stats["start_date"] = start_date
                stats["end_date"] = end_date
                
                for i, (range_start, range_end) in enumerate(date_ranges):
                    self.logger.info(f"Processing bucket {i+1}/{len(date_ranges)}: {range_start} to {range_end}")
                    
                    # Delete data in the range
                    self.delete_data(destination_table, incremental_update_column, range_start, range_end, dataflow_id)
                    
                    # Insert a record with "Running" status
                    if i == 0:
                        self.insert_into_incremental(dataflow_id, workspace_id, dataflow_name, initial_load_from_date,
                                        bucket_size_in_days, incrementally_update_last_n_days, destination_table,
                                        incremental_update_column, "Running", range_start, range_end)
                    else:
                        self.update_incremental(dataflow_id, "Running", range_start, range_end)
                        
                    # Close connection before long-running dataflow operation
                    self._close_connection()

                    # Trigger dataflow refresh
                    self.refresh_dataflow(workspace_id, dataflow_id)
                    
                    # Wait for completion if requested
                    if wait_for_completion:
                        # Already closed connection before this step
                        # Add a small buffer of 5 seconds just to make sure the status is updated
                        time.sleep(5)
                        status = self.wait_for_refresh_completion(workspace_id, dataflow_id, timeout_minutes)
                        
                        # Ensure fresh connection for database operations
                        self._ensure_connection()
                        
                        # Update status in the incremental table
                        self.update_incremental(dataflow_id, status, range_start, range_end)

                        stats["buckets_refreshed"] += 1
                        if status == "Success":
                            stats["successful_refreshes"] += 1
                        else:
                            stats["failed_refreshes"] += 1
                            self.logger.warning(f"Refresh failed for range {range_start} to {range_end} with status {status}")
                    else:
                        self.logger.info("Not waiting for completion, continuing with next bucket")
                        stats["buckets_refreshed"] += 1
            
            # Case 2: Previous refresh exists and we're not reinitializing
            else:
                last_status = last_refresh_data.get('status')
                last_range_start = last_refresh_data.get('range_start')
                last_range_end = last_refresh_data.get('range_end')
                
                self.logger.info(f"Previous refresh found with status '{last_status}' for range {last_range_start} to {last_range_end}")
                
                # Case 2a: Previous refresh was not successful, retry it
                if last_status not in ["Completed", "Success", "Successful"]:
                    self.logger.info(f"Previous refresh was not successful, retrying for range {last_range_start} to {last_range_end}")
                    
                    # Parse dates
                    range_start = last_range_start if isinstance(last_range_start, datetime) else self._parse_date(last_range_start)
                    range_end = last_range_end if isinstance(last_range_end, datetime) else self._parse_date(last_range_end)
                    
                    # Delete data in the range
                    self.delete_data(destination_table, incremental_update_column, range_start, range_end, dataflow_id)
                    
                    # Update status to "Running"
                    self.update_incremental(dataflow_id, "Running", range_start, range_end)
                    
                    # Close connection before long-running operation
                    self._close_connection()
                    
                    # Trigger dataflow refresh
                    self.refresh_dataflow(workspace_id, dataflow_id)
                    
                    # Wait for completion if requested
                    if wait_for_completion:
                        # Already closed connection above
                        # Add a small buffer of 5 seconds before checking status
                        time.sleep(5)
                        status = self.wait_for_refresh_completion(workspace_id, dataflow_id, timeout_minutes)
                        
                        # Ensure fresh connection for database operations
                        self._ensure_connection()
                        
                        self.update_incremental(dataflow_id, status, range_start, range_end)
                        
                        stats["buckets_refreshed"] += 1
                        if status == "Success":
                            stats["successful_refreshes"] += 1
                        else:
                            stats["failed_refreshes"] += 1
                            self.logger.warning(f"Retry refresh failed for range {range_start} to {range_end} with status {status}")
                    else:
                        stats["buckets_refreshed"] += 1
                
                # Case 2b: Previous refresh was successful, continue with next incremental update
                else:
                    self.logger.info("Previous refresh was successful, calculating next incremental update range")
                    
                    # Parse the last range end date
                    last_end_date = last_range_end if isinstance(last_range_end, datetime) else self._parse_date(last_range_end)

                    # End date is yesterday
                    range_end = yesterday_end

                    # If incrementally_update_last_n_days is provided, use it to potentially overlap with previous data
                    if incrementally_update_last_n_days:
                        possible_start = yesterday - timedelta(days=incrementally_update_last_n_days)
                        # Use the minimum of the two possible start dates to ensure there are no gaps
                        # If last_end_date + 1 second is earlier, use that to continue from where we left off
                        # If possible_start is earlier, use that to ensure we include the last N days
                        range_start = min(last_end_date + timedelta(seconds=1), possible_start)
                        self.logger.info(f"Using start date {range_start} (minimum of last_end_date+1 second and last {incrementally_update_last_n_days} days)")
                    else:
                        # Start from the day after the last end date
                        range_start = last_end_date + timedelta(seconds=1)
                        self.logger.info(f"Using start date {range_start} (continuing from last end date)")

                    # Only proceed if there's actual data to refresh
                    if range_start < range_end:
                        self.logger.info(f"Performing incremental update from {range_start} to {range_end}")

                        # Split the date range into buckets
                        date_ranges = self._get_date_ranges(range_start, range_end, bucket_size_in_days)

                        stats["date_ranges"] = len(date_ranges)
                        stats["start_date"] = range_start
                        stats["end_date"] = range_end
                        
                        for i, (bucket_start, bucket_end) in enumerate(date_ranges):
                            self.logger.info(f"Processing bucket {i+1}/{len(date_ranges)}: {bucket_start} to {bucket_end}")
                            
                            # Delete data in the range
                            self.delete_data(destination_table, incremental_update_column, bucket_start, bucket_end, dataflow_id)
                            
                            # Insert a record with "Running" status
                            self.update_incremental(dataflow_id, "Running", bucket_start, bucket_end)
                            
                            # Close connection before long-running operation
                            self._close_connection()
                            
                            # Trigger dataflow refresh
                            self.refresh_dataflow(workspace_id, dataflow_id)
                            
                            # Wait for completion if requested
                            if wait_for_completion:
                                # Already closed connection above
                                # Add a small buffer of 5 seconds before checking status
                                time.sleep(5)
                                status = self.wait_for_refresh_completion(workspace_id, dataflow_id, timeout_minutes)
                                
                                # Ensure fresh connection for database operations
                                self._ensure_connection()
                                
                                # Update status in the incremental table
                                self.update_incremental(dataflow_id, status, bucket_start, bucket_end)
                                
                                stats["buckets_refreshed"] += 1
                                if status == "Success":
                                    stats["successful_refreshes"] += 1
                                else:
                                    stats["failed_refreshes"] += 1
                                    self.logger.warning(f"Refresh failed for range {bucket_start} to {bucket_end} with status {status}")
                            else:
                                stats["buckets_refreshed"] += 1
                    else:
                        self.logger.info(f"No new data to refresh. Last refresh end date {last_end_date} is after or equal to start date {range_start}")
                        stats["current_status"] = "No new data to refresh"
            
            # Calculate total duration
            stats["end_time"] = datetime.now()
            stats["duration_seconds"] = (stats["end_time"] - start_time).total_seconds()
            stats["current_status"] = "Completed" if stats["failed_refreshes"] == 0 else f"Completed with {stats['failed_refreshes']} failures"
            
            self.logger.info(f"Incremental refresh completed for dataflow {dataflow_id}. "
                          f"Total buckets: {stats['buckets_refreshed']}, "
                          f"Successful: {stats['successful_refreshes']}, "
                          f"Failed: {stats['failed_refreshes']}")
            
            return stats
            
        except Exception as e:
            error_message = f"Error executing incremental refresh: {str(e)}"
            self.logger.error(error_message)
            
            # Update stats with error information
            stats["end_time"] = datetime.now()
            stats["duration_seconds"] = (stats["end_time"] - start_time).total_seconds()
            stats["current_status"] = f"Failed: {str(e)}"
            stats["error"] = str(e)
            
            # Ensure we close the connection on error
            self._close_connection()
            
            return stats
        finally:
            # Make sure to close the connection when done
            self._close_connection()

# Create the power bi rest client
client = fabric.PowerBIRestClient()

# Create the dataflow refresher object
dataflow_refresher = DataflowRefresher(
    client, 
    CONNECTION_ARTIFACT, 
    CONNECTION_ARTIFACT_ID, 
    CONNECTION_ARTIFACT_TYPE,
    SCHEMA, INCREMENTAL_TABLE, 
    logging.INFO)

# Incrementally refresh the dataflow
dataflow_refresher.execute_incremental_refresh(
    workspace_id, 
    dataflow_id, 
    dataflow_name, 
    destination_table, 
    incremental_update_column,
    initial_load_from_date, 
    bucket_size_in_days, 
    reinitialize_dataflow, 
    incrementally_update_last_n_days 
    )
