**_<mark> you can use more cores to make backfill faster : 2,4,8,16,32,64</mark>_**

In [1]:
#%%configure
#{"vCores": 2}

In [2]:
!pip install    obstore       --upgrade

Collecting obstore
  Downloading obstore-0.8.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (840 bytes)
Downloading obstore-0.8.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m27.4 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: obstore
Successfully installed obstore-0.8.2


In [3]:
try:
    import duckdb
    from notebookutils.common import configs
    configs.tokenCacheEnabled = False
    duckdb.sql(" force install delta from core_nightly;")
    duckdb.sql("update extensions").show()
except:
    print("all good")

┌────────────────┬──────────────┬─────────────────────┬──────────────────┬─────────────────┐
│ extension_name │  repository  │    update_result    │ previous_version │ current_version │
│    varchar     │   varchar    │       varchar       │     varchar      │     varchar     │
├────────────────┼──────────────┼─────────────────────┼──────────────────┼─────────────────┤
│ azure          │ core         │ NO_UPDATE_AVAILABLE │ 1593cb5          │ 1593cb5         │
│ delta          │ core_nightly │ NO_UPDATE_AVAILABLE │ b60cf00          │ b60cf00         │
└────────────────┴──────────────┴─────────────────────┴──────────────────┴─────────────────┘



In [4]:
from   psutil import *
core              = cpu_count()
Nbr_threads       = (core*2)+1

In [5]:
# please don't use a workspace name, Lakehouse and semantic_model with an empty space, or the same name of the lakehouse recently deleted
nbr_days_download     =  int(30 * 2 ** ((core - 2) / 2))  # or just input your numbers
lh                    = 'power' 
schema                = 'aemo'
semantic_model        = "directlake_on_onelake" 
ws                    =  notebookutils.runtime.context.get("currentWorkspaceName")

In [6]:
compaction_threshold  =  150
sql_folder            = 'https://github.com/djouallah/fabric_demo/raw/refs/heads/main/transformation/'
directlake_model      = "https://raw.githubusercontent.com/djouallah/fabric_demo/refs/heads/main/semantic_model/directlake.bim"
remaining_files       = max(0,nbr_days_download - 60)

<mark>**Core Logic**</mark>

In [7]:
import duckdb
import requests
import os
import sys
import importlib.util
from   deltalake import DeltaTable, write_deltalake
from   typing import List, Tuple, Union, Any, Optional, Callable, Dict
from   string import Template
class Tasksql:
    """
    Simplified Lakehouse task runner supporting:
      - ('script_name', (args,))          → runs script_name.py → script_name(*args)
      - ('table_name', 'mode', {params})  → runs table_name.sql with params, writes to Delta
    """

    def __init__(self, workspace: str, lakehouse_name: str, schema: str, sql_folder: str, compaction_threshold: int = 10):
        self.workspace = workspace
        self.lakehouse_name = lakehouse_name
        self.schema = schema
        self.sql_folder = sql_folder.strip()
        self.compaction_threshold = compaction_threshold
        self.table_base_url = f'abfss://{self.workspace}@onelake.dfs.fabric.microsoft.com/{self.lakehouse_name}.Lakehouse/Tables/'
        self.con = duckdb.connect()
        self.con.sql("SET preserve_insertion_order = false")
        self._attach_lakehouse()

    @classmethod
    def connect(cls, workspace: str, lakehouse_name: str, schema: str, sql_folder: str, compaction_threshold: int = 10):
        print("Connecting to Lakehouse...")
        return cls(workspace, lakehouse_name, schema, sql_folder.strip(), compaction_threshold)

    def _get_storage_token(self):
        return os.environ.get("AZURE_STORAGE_TOKEN", "PLACEHOLDER_TOKEN_TOKEN_NOT_AVAILABLE")

    def _create_onelake_secret(self):
        token = self._get_storage_token()
        if token != "PLACEHOLDER_TOKEN_TOKEN_NOT_AVAILABLE":
            self.con.sql(f"CREATE OR REPLACE SECRET onelake (TYPE AZURE, PROVIDER ACCESS_TOKEN, ACCESS_TOKEN '{token}')")
        else:
            print("Please login to Azure CLI")
            from azure.identity import AzureCliCredential, InteractiveBrowserCredential, ChainedTokenCredential
            credential = ChainedTokenCredential( AzureCliCredential(), InteractiveBrowserCredential())
            token = credential.get_token("https://storage.azure.com/.default")
            os.environ["AZURE_STORAGE_TOKEN"] = token.token
            self.con.sql("CREATE OR REPLACE PERSISTENT SECRET onelake (TYPE azure, PROVIDER credential_chain, CHAIN 'cli', ACCOUNT_NAME 'onelake')")

    def _attach_lakehouse(self):
        self._create_onelake_secret()
        try:
            list_tables_query = f"""
                SELECT DISTINCT(split_part(file, '_delta_log', 1)) as tables
                FROM glob ("abfss://{self.workspace}@onelake.dfs.fabric.microsoft.com/{self.lakehouse_name}.Lakehouse/Tables/*/*/_delta_log/*.json")
            """
            list_tables_df = self.con.sql(list_tables_query).df()
            list_tables = list_tables_df['tables'].tolist() if not list_tables_df.empty else []

            if not list_tables:
                print(f"No Delta tables found in {self.lakehouse_name}.Lakehouse/Tables.")
                return

            print(f"Found {len(list_tables)} Delta tables. Attaching as views...")

            for table_path in list_tables:
                parts = table_path.strip("/").split("/")
                if len(parts) >= 2:
                    potential_schema = parts[-2]
                    table = parts[-1]
                    if potential_schema == self.schema:
                        try:
                            self.con.sql(f"""
                                CREATE OR REPLACE VIEW {table}
                                AS SELECT * FROM delta_scan('{self.table_base_url}{self.schema}/{table}');
                            """)
                        except Exception as e:
                            print(f"Error creating view for table {table}: {e}")
            print("\nAttached tables (views) in DuckDB:")
            self.con.sql("SELECT name FROM (SHOW ALL TABLES) WHERE database='memory'").show()
        except Exception as e:
            print(f"Error attaching lakehouse: {e}")

    def _normalize_table_name(self, name: str) -> str:
        """
        Extract base table name by taking the part before the first double underscore '__'.
        If no underscore, return the name as-is.
        
        Examples:
            'sales__update' → 'sales'
            'fact__daily_v2' → 'fact'
            'events' → 'events'
        """
        if '__' in name:
            return name.split('__', 1)[0]
        return name

    def _read_sql_file(self, table_name: str, params: Optional[Dict] = None) -> Optional[str]:
        is_url = self.sql_folder.startswith("http")
        if is_url:
            url = f"{self.sql_folder.rstrip('/')}/{table_name}.sql".strip()
            try:
                resp = requests.get(url)
                resp.raise_for_status()
                content = resp.text
            except Exception as e:
                print(f"Failed to fetch SQL from {url}: {e}")
                return None
        else:
            path = os.path.join(self.sql_folder, f"{table_name}.sql")
            try:
                with open(path, 'r') as f:
                    content = f.read()
            except Exception as e:
                print(f"Failed to read SQL file {path}: {e}")
                return None

        if not content.strip():
            print(f"SQL file is empty: {table_name}.sql")
            return None

        # Merge system + user params
        full_params = {
            'ws': self.workspace,
            'lh': self.lakehouse_name,
            'schema': self.schema
        }
        if params:
            full_params.update(params)

        # Use string.Template ($ws, ${run_date}) — safe with DuckDB {}
        try:
            template = Template(content)
            content = template.substitute(full_params)
        except KeyError as e:
            print(f"Missing parameter in SQL file: ${e}")
            return None
        except Exception as e:
            print(f"Error during SQL template substitution: {e}")
            return None

        return content

    def _load_py_function(self, name: str) -> Optional[Callable]:
        is_url = self.sql_folder.startswith("http")
        try:
            if is_url:
                url = f"{self.sql_folder.rstrip('/')}/{name}.py".strip()
                resp = requests.get(url)
                resp.raise_for_status()
                code = resp.text
                namespace = {}
                exec(code, namespace)
                func = namespace.get(name)
                return func if callable(func) else None
            else:
                path = os.path.join(self.sql_folder, f"{name}.py")
                if not os.path.isfile(path):
                    print(f"Python file not found: {path}")
                    return None
                spec = importlib.util.spec_from_file_location(name, path)
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
                func = getattr(mod, name, None)
                return func if callable(func) else None
        except Exception as e:
            print(f"Error loading Python function '{name}': {e}")
            return None

    def _run_py_task(self, name: str, args: tuple) -> int:
        self._create_onelake_secret()
        func = self._load_py_function(name)
        if not func:
            return 0
        try:
            print(f"Running Python task: {name}{args}")
            result = func(*args)
            print(f"✅ Python task '{name}' completed.")
            return result
        except Exception as e:
            print(f"❌ Error in Python task '{name}': {e}")
            return 0

    def _run_sql_task(self, table: str, mode: str, params: Optional[Dict] = None) -> int:
        self._create_onelake_secret()
        allowed_modes = {'overwrite', 'append', 'ignore'}
        if mode not in allowed_modes:
            print(f"Invalid mode '{mode}'. Use: {allowed_modes}")
            return 0

        sql = self._read_sql_file(table, params)  # loads table.sql or table_anything.sql as-is
        if sql is None:
            return 0

        normalized_table = self._normalize_table_name(table)
        path = f"{self.table_base_url}{self.schema}/{normalized_table}"

        try:
            if mode == 'overwrite':
                self.con.sql(f"DROP VIEW IF EXISTS {normalized_table}")
                df = self.con.sql(sql).record_batch()
                write_deltalake(path, df, mode='overwrite')
                self.con.sql(f"CREATE OR REPLACE VIEW {normalized_table} AS SELECT * FROM delta_scan('{path}')")
                dt = DeltaTable(path)
                dt.vacuum(retention_hours=0, dry_run=False, enforce_retention_duration=False)
                dt.cleanup_metadata()

            elif mode == 'append':
                df = self.con.sql(sql).record_batch()
                write_deltalake(path, df, mode='append')
                self.con.sql(f"CREATE OR REPLACE VIEW {normalized_table} AS SELECT * FROM delta_scan('{path}')")
                dt = DeltaTable(path)
                if len(dt.file_uris()) > self.compaction_threshold:
                    print(f"Compacting {normalized_table} (files: {len(dt.file_uris())})")
                    dt.optimize.compact()
                    dt.vacuum(dry_run=False)
                    dt.cleanup_metadata()

            elif mode == 'ignore':
                try:
                    DeltaTable(path)
                    print(f"Table {normalized_table} exists. Skipping (mode='ignore').")
                except Exception:
                    print(f"{normalized_table} doesn't exist. Creating in overwrite mode.")
                    self.con.sql(f"DROP VIEW IF EXISTS {normalized_table}")
                    df = self.con.sql(sql).record_batch()
                    write_deltalake(path, df, mode='overwrite')
                    self.con.sql(f"CREATE OR REPLACE VIEW {normalized_table} AS SELECT * FROM delta_scan('{path}')")
                    dt = DeltaTable(path)
                    dt.vacuum(dry_run=False)
                    dt.cleanup_metadata()

            print(f"✅ SQL task '{table}' → table '{normalized_table}' ({mode}) completed.")
            return 1

        except Exception as e:
            print(f"❌ Error in SQL task '{table}' (writing to '{normalized_table}'): {e}")
            return 0

    def run_pipeline(self, tasks: List[Union[Tuple[str, tuple], Tuple[str, str, Dict]]]) -> bool:
        """
        Run tasks with simple syntax:
          - ('download', (url_list, path_list, depth))
          - ('staging_table', 'overwrite', {'run_date': '2024-06-01'})
        """
        for i, task in enumerate(tasks):
            print(f"\n--- Running Task {i+1}: {task[0]} ---")
            name = task[0]

            if len(task) == 2:
                # Python task: ('name', (args,))
                args = task[1]
                if not isinstance(args, (tuple, list)):
                    args = (args,)
                result = self._run_py_task(name, tuple(args))
            elif len(task) == 3:
                # SQL write task: ('table', 'mode', {params})
                mode, params = task[1], task[2]
                if not isinstance(params, dict):
                    print(f"❌ Expected dict as 3rd item in SQL task, got {type(params)}")
                    return False
                result = self._run_sql_task(name, mode, params)
            else:
                print(f"❌ Invalid task format: {task}")
                return False

            if result != 1:
                print(f"❌ Task {i+1} failed. Stopping.")
                return False

        print("\n✅ All tasks completed successfully.")
        return True

    def get_connection(self):
        return self.con
    
    def sql(self, query: str):
        """
        Execute an arbitrary SQL query on the DuckDB connection.
        Returns the result which can be converted to various formats (.df(), .show(), etc.)
        
        Example:
            ts.sql("SELECT * FROM my_table LIMIT 10").show()
            df = ts.sql("SELECT COUNT(*) FROM my_table").df()
        """
        return self.con.sql(query)

    def close(self):
        if self.con:
            self.con.close()
            print("DuckDB connection closed.")

In [8]:
%%time
con = Tasksql.connect( ws,lh,schema, sql_folder, compaction_threshold  )

Connecting to Lakehouse...
Found 9 Delta tables. Attaching as views...

Attached tables (views) in DuckDB:
┌─────────────┐
│    name     │
│   varchar   │
├─────────────┤
│ calendar    │
│ duid        │
│ mstdatetime │
│ price       │
│ price_today │
│ scada       │
│ scada_today │
│ summary     │
└─────────────┘

CPU times: user 1.22 s, sys: 58.2 ms, total: 1.27 s
Wall time: 4.14 s


In [9]:
nightly =[
              
              ('scrapingv2', (["https://nemweb.com.au/Reports/Current/Daily_Reports/"],["Reports/Current/Daily_Reports/"],nbr_days_download,ws,lh,Nbr_threads)),
              ('price','append',{'ws': ws,'lh':lh}),
              ('scada','append',{'ws': ws,'lh':lh}),
              ('download_excel',("raw/", ws,lh)),
              ('duid','overwrite',{'ws': ws,'lh':lh}),
              ('calendar','ignore',{}),
              ('mstdatetime','ignore',{}),
              ('summary__backfill','overwrite',{})
         ]

intraday = [
              ('scrapingv2', (["http://nemweb.com.au/Reports/Current/DispatchIS_Reports/","http://nemweb.com.au/Reports/Current/Dispatch_SCADA/" ],
                            ["Reports/Current/DispatchIS_Reports/","Reports/Current/Dispatch_SCADA/"],
                             288, ws,lh,Nbr_threads)),
              ('price_today','append',{'ws': ws,'lh':lh}),
              ('scada_today','append',{'ws': ws,'lh':lh}),
              ('duid','ignore',{'ws': ws,'lh':lh}),
              ('summary__incremental', 'append',{})            
          ]

history_download = [('scrapingv2',(["https://github.com/djouallah/fabric_demo/tree/main/data/archive/*"],["Reports/Current/Daily_Reports/"],remaining_files,ws,lh,Nbr_threads))]

history_process = [('scada','append',{'ws': ws,'lh':lh}),('price','append',{'ws': ws,'lh':lh}),('summary__backfill_archive','append',{})]

In [10]:
%%time
#create lakehouse if not exists
con.run_pipeline([('create_lakehouse_if_not_exists', (lh))])


--- Running Task 1: create_lakehouse_if_not_exists ---
Running Python task: create_lakehouse_if_not_exists('power',)
✅ Python task 'create_lakehouse_if_not_exists' completed.

✅ All tasks completed successfully.
CPU times: user 91.1 ms, sys: 4.83 ms, total: 96 ms
Wall time: 1.08 s


True

In [11]:
%%time
#initial load
con.run_pipeline(nightly)


--- Running Task 1: scrapingv2 ---
Running Python task: scrapingv2(['https://nemweb.com.au/Reports/Current/Daily_Reports/'], ['Reports/Current/Daily_Reports/'], 30, 'temp', 'power', 5)
https://nemweb.com.au/Reports/Current/Daily_Reports/ - 0 files extracted (all 60 files already downloaded)
✅ Python task 'scrapingv2' completed.
❌ Task 1 failed. Stopping.
CPU times: user 96.8 ms, sys: 7 ms, total: 104 ms
Wall time: 1.24 s


False

In [12]:
%%time
#create  semantic model
con.run_pipeline([('deploy_modelv2', (lh,schema,semantic_model,directlake_model))])


--- Running Task 1: deploy_modelv2 ---
Running Python task: deploy_modelv2('power', 'aemo', 'directlake_on_onelake', 'https://raw.githubusercontent.com/djouallah/fabric_demo/refs/heads/main/semantic_model/directlake.bim')
Power BI Semantic Model Deployment

[Step 1/7] Getting workspace information...
✓ Workspace ID: f24e70e8-54ec-491b-9175-2c1aef1138ce

[Step 2/7] Checking if dataset 'directlake_on_onelake' exists...
⚠️  Dataset 'directlake_on_onelake' already exists in this workspace

✓ Dataset 'directlake_on_onelake' already exists - skipping deployment
   Proceeding directly to refresh...

[Step 8/9] Waiting for permission propagation...
   Allowing time for any recent changes to propagate...
   ⏳ 5 seconds remaining...
✓ Wait complete

[Step 9/9] Refreshing semantic model...
   Loading data from lakehouse via DirectLake...
✓ Successfully refreshed semantic model

🎉 Refresh Completed Successfully!

Dataset Name:     directlake_on_onelake
Workspace ID:     f24e70e8-54ec-491b-9175-2c

True

In [13]:
%%time
#load today data
con.run_pipeline(intraday)


--- Running Task 1: scrapingv2 ---
Running Python task: scrapingv2(['http://nemweb.com.au/Reports/Current/DispatchIS_Reports/', 'http://nemweb.com.au/Reports/Current/Dispatch_SCADA/'], ['Reports/Current/DispatchIS_Reports/', 'Reports/Current/Dispatch_SCADA/'], 288, 'temp', 'power', 5)
Flushed 3 files to disk and updated log Reports/Current/DispatchIS_Reports/download_log.csv
Flushed 3 files to disk and updated log Reports/Current/Dispatch_SCADA/download_log.csv
http://nemweb.com.au/Reports/Current/DispatchIS_Reports/ - 3 files extracted and uploaded
http://nemweb.com.au/Reports/Current/Dispatch_SCADA/ - 3 files extracted and uploaded
✅ Python task 'scrapingv2' completed.

--- Running Task 2: price_today ---
✅ SQL task 'price_today' → table 'price_today' (append) completed.

--- Running Task 3: scada_today ---
✅ SQL task 'scada_today' → table 'scada_today' (append) completed.

--- Running Task 4: duid ---
Table duid exists. Skipping (mode='ignore').
✅ SQL task 'duid' → table 'duid' (ig

True

**_<u><mark>Files downloaded from github, it is very very slow, but free :)</mark></u>_**

In [14]:
%%time
if remaining_files > 0:
    ## you can turn it off when Historical data is fully loaded
    con.run_pipeline(history_download)

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.96 µs


In [15]:
if remaining_files > 0:
    con.run_pipeline(history_process)

In [16]:
notebookutils.notebook.exit(con.sql("select count(*) from summary").fetchone()[0])

ExitValue: 3947505