In [None]:
from datetime import datetime
import notebookutils
import os
import pandas as pd
import pyarrow as pa

from deltalake import write_deltalake, DeltaTable
from sempy.fabric import get_notebook_workspace_id, list_workspaces  
from zoneinfo import ZoneInfo

In [None]:
# Building paths
workspace_id = get_notebook_workspace_id()  
workspace_name = list_workspaces().query("Id == @workspace_id")['Name'].iloc[0]
print(f'Workspace: {workspace_name} | ID: {workspace_id}')

lakehouse_name = 'LK_Lakehouse'
lakehouse_path = f'abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.Lakehouse'

tables_path = f'{lakehouse_path}/Tables'
print(f'Tables path: {tables_path}') 

landing_path = f'{lakehouse_path}/Files/Landing/Files'
print(f'Files landing path: {landing_path}') 

loaded_path = f'{lakehouse_path}/Files/Loaded'
print(f'Files loaded path: {loaded_path}')


In [None]:
# Parameters to expose in Data Pipeline  
is_incremental = False

In [None]:
# Load CSV files to Delta

# Read files
files = notebookutils.fs.ls(landing_path)

if not files:
    print(f'Any file found in {landing_path}') 

else:
    files_qty = len(files)
    print(f'Found {files_qty} files in {landing_path}')  

    # Creates a empyt list to append each DataFrame
    df_list = [] 

    # Iterate
    for f in files:
        file_name = f.name
        if not file_name.endswith('.csv'):
            continue
        
        date_str = file_name.split('_')[1].replace('.csv', '')        # Extract the date from the filename
        df = pd.read_csv(f'{landing_path}/{file_name}')               # CSV to pandas.DataFrame
        
        df['Date'] = datetime.strptime(date_str, "%Y-%m-%d").date()   # Add the column 'Data'
        
        df_list.append(df)                                            # Append each DataFrame

    # Concat the list in one DataFrame
    df_final = pd.concat(df_list, ignore_index=True)                  

    # Rename with friendly BI column names
    df_final.rename(                                                  
        columns={
            'employee_id': 'EmployeeID',
            'standard_hours': 'StandardHours',
            'worked_hours': 'WorkedHours',
        },
        inplace=True,
    )

    # Reordered columns
    df_final = df_final[['Date', 'EmployeeID', 'StandardHours', 'WorkedHours']]

    # Writing with mode
    mode = 'append' if is_incremental else 'overwrite' 
    write_deltalake(f'{tables_path}/FactAbsenteeism', df_final, mode=mode)   
    print(f'✓ CSV files were loaded to Delta tables with `{mode}` mode successfully!')

    # Extracting max_date to incremental logic
    max_date = df_final['Date'].max() 
    df_max_date = pd.DataFrame({'MaxDate': [max_date]}) 
    write_deltalake(f'{tables_path}/MaxDate', df_max_date, mode='overwrite')

    # Getting datetime to build the path of loaded files
    datetime_str = datetime.now(ZoneInfo('America/Sao_Paulo')).strftime('%Y-%m-%d_%H-%M-%S')
    loaded_path_with_date = f'{loaded_path}/{datetime_str}'

    # Moving files landing from loading  
    notebookutils.fs.mv(landing_path, loaded_path_with_date)   
    print(f'✓ All files moved successfully!')
    print(f'From: {landing_path}')
    print(f'To: {loaded_path_with_date}/Files')  
