In [None]:
import duckdb
import requests
from deltalake import DeltaTable, write_deltalake
from typing import List, Tuple, Union, Any
from notebookutils.common import configs
configs.tokenCacheEnabled = False

class Tasksql:
    """
    A class to manage interactions with a Lakehouse using DuckDB and Delta Lake,
    using a connect() class method for initialization.
    """

    def __init__(self, workspace: str, lakehouse_name: str, schema: str, sql_folder: str, compaction_threshold: int = 10):
        """
        Initializes the Tasksql connector instance.
        This constructor is intended to be called by the class method `connect`.

        Args:
            workspace (str): The name of the workspace.
            lakehouse_name (str): The name of the Lakehouse (LH).
            schema (str): The schema within the Lakehouse (e.g., 'dbo').
            sql_folder (str): The path or URL to the folder containing SQL files.
            compaction_threshold (int): The number of files in a Delta table
                                        before compaction is triggered (default is 10).
        """
        self.workspace = workspace
        self.lakehouse_name = lakehouse_name
        self.schema = schema
        self.sql_folder = sql_folder
        self.compaction_threshold = compaction_threshold
        self.table_base_url = f'abfss://{self.workspace}@onelake.dfs.fabric.microsoft.com/{self.lakehouse_name}.Lakehouse/Tables/'
        self.con = duckdb.connect()
        self._attach_lakehouse() # Automatically attach lakehouse on initialization

    @classmethod
    def connect(cls, workspace: str, lakehouse_name: str, schema: str, sql_folder: str, compaction_threshold: int = 10):
        """
        Class method to create and initialize a Tasksql connector instance.

        Args:
            workspace (str): The name of the workspace.
            lakehouse_name (str): The name of the Lakehouse (LH).
            schema (str): The schema within the Lakehouse (e.g., 'dbo').
            sql_folder (str): The path or URL to the folder containing SQL files.
            compaction_threshold (int): The number of files in a Delta table
                                        before compaction is triggered (default is 10).

        Returns:
            Tasksql: An initialized instance of the Tasksql class.
        """
        print("Connecting to Lakehouse...")
        # Create an instance of the class using the provided arguments
        return cls(workspace, lakehouse_name, schema, sql_folder, compaction_threshold)


    def _get_storage_token(self):
        """
        Retrieves the storage access token using notebookutils.
        Assumes notebookutils is available in the execution environment.
        """
        try:
            # Replace with the actual way to get the token in your environment
            # This is a placeholder based on your original code
            # If not in Fabric/Synapse notebooks, you'll need another method
            return notebookutils.credentials.getToken('storage')
        except NameError:
            # print("Warning: notebookutils not found. Cannot get storage token.") # Reduced verbosity
            # print("Please ensure this code is run in an environment where notebookutils is available.") # Reduced verbosity
            return "PLACEHOLDER_TOKEN_TOKEN_NOT_AVAILABLE"


    def _create_onelake_secret(self):
        """
        Creates or replaces the OneLake secret in DuckDB.
        """
        token = self._get_storage_token()
        if token != "PLACEHOLDER_TOKEN_TOKEN_NOT_AVAILABLE":
             self.con.sql(f"""
                CREATE or replace SECRET onelake (
                    TYPE AZURE,
                    PROVIDER ACCESS_TOKEN,
                    ACCESS_TOKEN '{token}'
                )
            """)
             # print("OneLake secret created/replaced.") # Reduced verbosity
        else:
            print("Skipping OneLake secret creation due to missing storage token.")


    def _attach_lakehouse(self):
        """
        Attaches the Lakehouse tables as views in the DuckDB connection.
        """
        self._create_onelake_secret()
        try:
            # Query to find delta table paths in the Lakehouse
            list_tables_query = f"""
                SELECT DISTINCT(split_part(file, '_delta_log', 1)) as tables
                FROM glob ("abfss://{self.workspace}@onelake.dfs.fabric.microsoft.com/{self.lakehouse_name}.Lakehouse/Tables/*/*/_delta_log/*.json")
            """
            list_tables_df = self.con.sql(list_tables_query).df()
            list_tables = list_tables_df['tables'].tolist() if not list_tables_df.empty else []

            if not list_tables:
                print(f"No Delta tables found in {self.lakehouse_name}.Lakehouse/Tables.")
                return 'No tables found'

            print(f"Found {len(list_tables)} Delta tables. Attaching as views...")

            for table_path in list_tables:
                parts = table_path.strip("/").split("/")
                # Assuming path is like .../Lakehouse/Tables/schema/table_name
                if len(parts) >= 2:
                     # Extract schema and table name from the path
                    potential_schema = parts[-2]
                    table = parts[-1]

                    # Only create view if the schema matches the configured schema for this instance
                    if potential_schema == self.schema:
                        try:
                            view_query = f"""
                                CREATE OR REPLACE view {table}
                                AS select * FROM delta_scan('{self.table_base_url}{self.schema}/{table}');
                            """
                            self.con.sql(view_query)
                            # print(f"View created/replaced for table: {table}") # Reduced verbosity
                        except Exception as e:
                            print(f"Error creating/replacing view for table {table}: {e}")
                            # Continue to the next table even if one fails
                            pass
                    else:
                        # print(f"Skipping table '{table}' in schema '{potential_schema}' as it doesn't match the configured schema '{self.schema}'.") # Reduced verbosity
                        pass
                else:
                    print(f"Skipping invalid table path format: {table_path}")


            print("\nAttached tables (views) in DuckDB:")
            # Show the created views in the 'memory' database (DuckDB's default)
            self.con.sql("""
                SELECT name, column_names
                FROM (show all tables)
                WHERE database='memory'
            """).show(max_width=130)

            return 'success'

        except Exception as e:
            print(f"An error occurred during lakehouse attachment: {e}")
            return 'error'


    def _read_sql_file(self, table_name: str) -> str | None:
        """
        Reads the SQL content from a local file or a GitHub URL.

        Args:
            table_name (str): The name of the table, used to construct the file name.

        Returns:
            str | None: The content of the SQL file, or None if an error occurs.
        """
        file_path = f'{self.sql_folder}/{table_name}.sql'
        is_github = self.sql_folder.startswith("http")
        sql_content = None

        try:
            if is_github:
                # print(f"Fetching SQL from URL: {file_path}") # Reduced verbosity
                response = requests.get(file_path)
                response.raise_for_status() # Raise an exception for bad status codes
                sql_content = response.text
            else:
                # print(f"Reading SQL from local file: {file_path}") # Reduced verbosity
                with open(file_path, 'r') as file:
                    sql_content = file.read()

            if not sql_content or not sql_content.strip():
                print(f"Error: SQL content is empty or could not be read from '{file_path}'.")
                return None

            return sql_content

        except FileNotFoundError:
            print(f"Error: The file '{file_path}' was not found.")
            return None
        except requests.exceptions.RequestException as e:
            print(f"Error fetching SQL from URL {file_path}: {e}")
            return None
        except Exception as e:
            print(f"An unexpected error occurred while reading SQL file: {e}")
            return None


    def _write_delta(self, sql_content: str, table_name: str, mode: str):
        """
        Writes data to a Delta table based on SQL query results.

        Args:
            sql_content (str): The SQL query to get the data.
            table_name (str): The name of the target Delta table.
            mode (str): The write mode ('overwrite', 'append', 'ignore').

        Raises:
            Exception: If the write operation fails.
        """
        tbl_path = f"{self.table_base_url}{self.schema}/{table_name}"
        # Recommended Row Group size for Delta Lake
        RG = 8_000_000

        try:
            # Execute the SQL query and get results as a PyArrow RecordBatch
            df = self.con.sql(sql_content).record_batch()

            # Write the RecordBatch to the Delta Lake table
            write_deltalake(
                tbl_path,
                df,
                mode=mode,
                max_rows_per_file=RG,
                max_rows_per_group=RG,
                min_rows_per_group=RG,
                engine='pyarrow' # Specify pyarrow engine
            )
            # print(f'Table {table_name} updated, Delta mode {mode}') # Reduced verbosity

        except Exception as e:
            print(f"Error writing to delta table {table_name} in mode {mode}: {e}")
            raise # Re-raise the exception after printing

    def _run_sql_internal(self, table_name: str | None = None, mode: str | None = None, sql_statement: str | None = None) -> int:
        """
        Internal helper method to execute SQL and optionally write to Delta Lake.

        Args:
            table_name (str, optional): The name of the table. Required if sql_statement is None
                                        and mode is not None. Used to construct the SQL file path
                                        or the target Delta table path.
            mode (str, optional): The mode of operation for Delta Lake writes.
                                 Allowed values are None, 'overwrite', 'append', 'ignore'.
                                 Defaults to None (direct SQL execution).
            sql_statement (str, optional): A direct SQL statement string to execute.
                                           If provided, the method will use this string
                                           instead of reading from a file.

        Returns:
            int: 1 if the SQL run was successful, 0 otherwise.
        """
        successful_runs = 0

        sql_content = None

        if sql_statement is not None:
            sql_content = sql_statement
            # print("Using provided SQL statement.") # Reduced verbosity
            if mode is not None and (table_name is None or not table_name.strip()):
                 print("Error: table_name is required when using sql_statement with a write mode ('overwrite', 'append', 'ignore').")
                 return 0

        elif table_name is not None and table_name.strip():
            sql_content = self._read_sql_file(table_name)
            if sql_content is None:
                 return 0
            # print(f"Using SQL from file for table: {table_name}") # Reduced verbosity

        else:
            print("Error: Either 'table_name' (for file reading) or 'sql_statement' must be provided.")
            return 0

        if not sql_content or not sql_content.strip():
             print("Error: SQL content is empty after attempting to load.")
             return 0

        self._create_onelake_secret() # Ensure secret is available

        try:
            if mode in ['overwrite', 'append', 'ignore']:
                if table_name is None or not table_name.strip():
                     print(f"Error: table_name is required for Delta write mode '{mode}'.")
                     return 0

                # print(f"Running in mode: {mode} for table: {table_name}") # Reduced verbosity
                tbl_path = f"{self.table_base_url}{self.schema}/{table_name}"

                if mode == 'overwrite':
                    try:
                        self.con.sql(f'drop VIEW if exists {table_name}')
                        self._write_delta(sql_content, table_name, mode)
                        self.con.sql(f"""
                            CREATE OR REPLACE VIEW {table_name} AS
                            SELECT * FROM delta_scan('{tbl_path}')
                        """)
                        successful_runs += 1
                        dt = DeltaTable(tbl_path)
                        dt.vacuum(retention_hours=0, dry_run=False, enforce_retention_duration=False)
                        dt.cleanup_metadata()
                        # print("Vacuum and metadata cleanup completed.") # Reduced verbosity
                    except Exception as e:
                        print(f"Error updating data or creating view in overwrite mode for {table_name}: {e}")

                elif mode == 'append':
                    try:
                        self._write_delta(sql_content, table_name, mode)
                        self.con.sql(f"""
                            CREATE OR REPLACE VIEW {table_name} AS
                            SELECT * FROM delta_scan('{tbl_path}')
                        """)
                        successful_runs += 1
                        dt = DeltaTable(tbl_path)
                        if len(dt.files()) > self.compaction_threshold:
                            print(f"Number of files ({len(dt.files())}) exceeds compaction threshold ({self.compaction_threshold}). Compacting...")
                            dt.optimize.compact()
                            dt.vacuum(retention_hours=7, dry_run=False, enforce_retention_duration=False)
                            dt.cleanup_metadata()
                            print("Compaction, vacuum, and metadata cleanup completed.")
                        else:
                             # print(f"Number of files ({len(dt.files())}) is below compaction threshold ({self.compaction_threshold}). Skipping compaction.") # Reduced verbosity
                             pass


                    except Exception as e:
                        print(f"Error updating data or creating view in append mode for {table_name}: {e}")

                elif mode == 'ignore':
                    try:
                        dt = DeltaTable(tbl_path)
                        # print(f"{table_name} exists already. Ignoring write operation.") # Reduced verbosity
                        successful_runs += 1
                    except Exception:
                        print(f"{table_name} does not exist. Writing in overwrite mode.")
                        try:
                            self._write_delta(sql_content, table_name, 'overwrite')
                            self.con.sql(f"""
                                CREATE OR REPLACE VIEW {table_name} AS
                                SELECT * FROM delta_scan('{tbl_path}')
                            """)
                            successful_runs += 1
                            dt = DeltaTable(tbl_path)
                            dt.vacuum(retention_hours=7, dry_run=False, enforce_retention_duration=False)
                            dt.cleanup_metadata()
                            # print(f"{table_name} created and vacuumed.") # Reduced verbosity
                        except Exception as e:
                            print(f"Error creating table in ignore mode (using overwrite) for {table_name}: {e}")

            elif mode is None:
                # print(f"Running direct SQL execution.") # Reduced verbosity
                try:
                    self.con.sql(sql_content).show()
                    # print("SQL executed successfully.") # Reduced verbosity
                    successful_runs += 1
                except Exception as e:
                    print(f"Error executing SQL: {e}")

        except Exception as e:
            print(f"An unexpected error occurred during SQL execution: {e}")

        return successful_runs

    def write(self, table_name: str, mode: str) -> int:
        """
        Writes data to a Delta Lake table based on SQL read from a file.

        Args:
            table_name (str): The name of the table. Used to construct the SQL file path
                                and the target Delta table path.
            mode (str): The write mode of operation for Delta Lake writes.
                                 Allowed values are 'overwrite', 'append', 'ignore'.

        Returns:
            int: 1 if the write operation was successful, 0 otherwise.
        """
        allowed_write_modes = ['overwrite', 'append', 'ignore']
        if mode not in allowed_write_modes:
            print(f"Error: Invalid write mode '{mode}'. Allowed modes are: {', '.join(allowed_write_modes)}")
            return 0

        if not isinstance(table_name, str) or not table_name.strip():
             print("Error: 'table_name' must be a non-empty string for write operations.")
             return 0

        # print(f"Attempting to write to table: {table_name} in mode: {mode}") # Reduced verbosity
        # Call the internal helper method with mode and table_name
        return self._run_sql_internal(table_name=table_name, mode=mode, sql_statement=None)

    def sql(self, sql_statement: str | None = None, table_name: str | None = None) -> int:
        """
        Executes a SQL query. Can accept a direct SQL statement or read from a file.
        This method does NOT perform any Delta Lake write operations.

        Args:
            sql_statement (str, optional): A direct SQL statement string to execute.
                                           If provided, the method will use this string.
            table_name (str, optional): The name of the table. If sql_statement is None,
                                        this is used to read SQL from a file.

        Returns:
            int: 1 if the SQL execution was successful, 0 otherwise.
        """
        if sql_statement is None and (table_name is None or not isinstance(table_name, str) or not table_name.strip()):
             print("Error: Either 'sql_statement' or 'table_name' must be provided for SQL execution.")
             return 0

        # print(f"Attempting to execute SQL (from statement: {sql_statement is not None}, from file: {table_name if table_name else 'None'})") # Reduced verbosity
        # Call the internal helper method with mode=None
        return self._run_sql_internal(table_name=table_name, mode=None, sql_statement=sql_statement)


    def run_sql_sequence(self, tasks_list: List[Union[Tuple[str, str], Tuple[str, Any, Any, Any]]]) -> bool:
        """
        Runs a sequence of SQL tasks. Stops if any task fails.
        Each task can be a tuple in one of two formats:
          - (table_name, mode): For Delta Lake write operations (infers 'write' task type).
                                table_name and mode ('overwrite', 'append', 'ignore') are required.
          - (task_type, table_name, mode, sql_statement): Explicit task definition.
                                task_type can be 'write' or 'sql'.
                                Parameters depend on task_type as described in previous version.

        Args:
            tasks_list (List[Union[Tuple[str, str], Tuple[str, Any, Any, Any]]]): A list of task tuples.

        Returns:
            bool: True if all tasks completed successfully, False otherwise.
        """
        previous_task_successful = True

        for i, task in enumerate(tasks_list):
            if previous_task_successful:
                result = 0
                task_info = None # For logging

                if isinstance(task, (list, tuple)):
                    if len(task) == 2:
                        # Assume it's a write task: (table_name, mode)
                        table_name = task[0] if len(task) > 0 else None
                        mode = task[1] if len(task) > 1 else None
                        task_type = 'write' # Inferred
                        task_info = f"Write Task: Table: {table_name}, Mode: {mode}"

                        # Validate parameters for inferred 'write' task
                        if not isinstance(table_name, str) or not table_name.strip() or mode not in ['overwrite', 'append', 'ignore']:
                            print(f"Error: Invalid parameters for inferred 'write' task at index {i}: {task}. Expected (table_name, mode) with valid table_name and mode. Skipping.")
                            previous_task_successful = False
                            continue

                        print(f"\n--- Running Task {i+1}: {task_info} ---")
                        result = self.write(table_name=table_name, mode=mode)

                    elif len(task) == 4:
                        # Assume it's an explicit task: (task_type, table_name, mode, sql_statement)
                        task_type = task[0] if len(task) > 0 else None
                        table_name = task[1] if len(task) > 1 else None
                        mode = task[2] if len(task) > 2 else None
                        sql_statement = task[3] if len(task) > 3 else None
                        task_info = f"Explicit Task: Type: {task_type}, Table: {table_name}, Mode: {mode}, Using Statement: {sql_statement is not None}"


                        if task_type not in ['write', 'sql']:
                            print(f"Error: Invalid task type at index {i}: {task_type}. Expected 'write' or 'sql'. Skipping this task.")
                            previous_task_successful = False
                            continue

                        print(f"\n--- Running Task {i+1}: {task_info} ---")

                        if task_type == 'write':
                            # Validate parameters for explicit 'write' task
                            if not isinstance(table_name, str) or not table_name.strip() or mode not in ['overwrite', 'append', 'ignore']:
                                print(f"Error: Invalid parameters for explicit 'write' task at index {i}: (table_name='{table_name}', mode='{mode}'). table_name must be a non-empty string and mode must be 'overwrite', 'append', or 'ignore'. Skipping.")
                                previous_task_successful = False
                                continue
                            result = self.write(table_name=table_name, mode=mode)
                        elif task_type == 'sql':
                            # Validate parameters for 'sql' task
                            if (sql_statement is None or not isinstance(sql_statement, str) or not sql_statement.strip()) and \
                               (table_name is None or not isinstance(table_name, str) or not table_name.strip()):
                                print(f"Error: Invalid parameters for 'sql' task at index {i}: (sql_statement='{sql_statement}', table_name='{table_name}'). Either sql_statement or table_name must be provided. Skipping.")
                                previous_task_successful = False
                                continue
                            result = self.sql(sql_statement=sql_statement, table_name=table_name)
                    else:
                        print(f"Error: Invalid task entry format at index {i}: {task}. Expected a tuple of length 2 (table_name, mode) or 4 (task_type, table_name, mode, sql_statement). Skipping this task.")
                        previous_task_successful = False
                        continue
                else:
                    print(f"Error: Invalid task entry type at index {i}: {task}. Expected a tuple or list. Skipping this task.")
                    previous_task_successful = False
                    continue


                if result == 1:
                    print(f"Task {i+1} successful.")
                    previous_task_successful = True
                else:
                    print(f"Task {i+1} failed. Stopping sequence.")
                    previous_task_successful = False
                    break
            else:
                # Construct skip info based on the task format encountered
                skip_info = "Unknown Task Format"
                if isinstance(task, (list, tuple)):
                    if len(task) == 2:
                        skip_info = f"Write Task: Table: {task[0]}, Mode: {task[1]}"
                    elif len(task) == 4:
                        skip_info = f"Explicit Task: Type: {task[0]}, Table: {task[1]}, Statement: {task[3] is not None}"

                print(f"Skipping Task {i+1} ({skip_info}) due to previous failure.")


        if previous_task_successful:
            print("\nAll specified SQL tasks completed successfully.")
        else:
            print("\nOne or more SQL tasks failed.")

        return previous_task_successful


    def get_connection(self):
        """
        Returns the active DuckDB connection object.
        """
        return self.con

    def close(self):
        """
        Closes the DuckDB connection.
        """
        if self.con:
            self.con.close()
            print("DuckDB connection closed.")

In [None]:
con = Tasksql.connect(
     workspace='processing',
     lakehouse_name='test',
     schema='new',
     sql_folder='https://github.com/djouallah/Fabric_Notebooks_Demo/raw/refs/heads/main/orchestration/sql/',
     compaction_threshold=100 
 )

In [None]:
%%time
sql_tasks_to_run_nightly = [
    ('price', 'append'),
    ('scada', 'append'),
    ('duid', 'ignore'),
    ('summary', 'overwrite'),
    ('calendar', 'ignore'),
    ('mstdatetime', 'ignore')
    ]
con.run_sql_sequence(sql_tasks_to_run_nightly)

In [None]:
%%time
sql_tasks_to_intraday = [
    ('price_today', 'append'),
    ('scada_today', 'append'),
    ('duid', 'ignore'),
    ('summary', 'append')
]
con.run_sql_sequence(sql_tasks_to_intraday)