In [1]:
# append custom function path and bring in lookup
import sys
sys.path.append('/home/asha/airflow/dags/custom_functions')
from lookup_support_provider import lookup_support_provider

# packages
import duckdb
import time
import os
import json
import shutil
import logging
import pandas as pd
import numpy as np
import warnings
import re
from pathlib import Path
from datetime import datetime

In [2]:
warnings.simplefilter(action='ignore', category=UserWarning)

In [3]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [4]:
database_schema = [
    'Tenant_SK',
    'PropertyAddress',
    'Room', 
    'FirstName', 
    'MiddleName', 
    'LastName',
    'DateOfBirth', 
    'NINumber', 
    'CheckinDate', 
    'CheckoutDate',
    'NewHBClaim',
    'HBClaimRefNumber', 
    'ReferralAgency',
    'GroupedReferralAgency',
    'Age', 
    'Gender', 
    'Religion',
    'Ethnicity', 
    'Nationality', 
    'Disability', 
    'SexualOrientation',
    'SpokenLanguage', 
    'RiskAssessment', 
    'LengthOfStay', 
    'CycleNumber',
    'CycleNumberValue',
    'Source',
    'ExtractedProviderName',
    'ProviderName',
    'LoadDate'
]

In [5]:
# TODO: Get cycle number from the file source

# Regular expression pattern to match "C" followed by one or more digits
pattern = r'C(\d+)'

# establish the load date
load_date = datetime.now()

from functools import wraps

def replace_text(text):
    """Escapes single quotes and handles other special characters."""
    if isinstance(text, str):
        return text.replace("'", "''")  # Double single quotes for MySQL
    return text

# motherduck config
server_config = "/home/asha/airflow/duckdb-config.json"

with open(server_config, "r") as fp:
    config = json.load(fp)
token = config['token']

def log_execution(func):
    """
    """
    
    @wraps(func)
    def etl_task_time(*args, **kwargs):
        start_time = time.time()
        print(f"Starting '{func.__name__}'...")
        result = func(*args, **kwargs)
        print(f"Finished '{func.__name__}' in {time.time() - start_time} seconds.")
        return result

    return etl_task_time

def motherduck_connection(token):
    def connection_decorator(func):
        con = duckdb.connect(f'md:?motherduck_token={token}')
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            # pass con as a keyword argument for use in other functions
            return func(*args, con=con, **kwargs)
    
        return wrapper
    return connection_decorator

@log_execution
@motherduck_connection(token=token)
def job_load_tenant_data(bronze_schema, bronze_table_name, con, **kwargs):
    """
    """

    # connect to motherduck
    con.sql("USE asha_production;")
    con.sql(f"CREATE SCHEMA IF NOT EXISTS {bronze_schema};")
    
    # iterate over files
    all_files = []
    p = Path(os.getcwd())
    source_path = p / 'source'
    processing_path = p / 'processing'
    processed_path = p / 'processed'
    error_path = p / 'errors'
    
    load_date = datetime.now()
    for root, dirs, files in os.walk(source_path):
        for dir in dirs:
                dir_path = os.path.join(root, dir)
                for file in os.listdir(dir_path):
                    file_path = os.path.join(dir_path, file)
                    if os.path.isfile(file_path) and file.endswith('.xlsx'):
                        source_dir = os.path.join(source_path, dir, file)
                        processing_dir = os.path.join(processing_path, dir)
                        processed_dir = os.path.join(processed_path, dir)
                        error_dir = os.path.join(error_path, dir)
                        
                        os.makedirs(processing_dir, exist_ok=True)
                        os.makedirs(processed_dir, exist_ok=True)
                        os.makedirs(error_dir, exist_ok=True)
                        
                        # process the file
                        processing_file_path = os.path.join(processing_dir, file)
                        shutil.move(source_dir, processing_file_path)
                        excel_file = pd.ExcelFile(processing_file_path)
                        
                        # only grab the sheet where the expected columns are present
                        selected_sheet = None
                        for sheet in excel_file.sheet_names:
                            first_row = pd.read_excel(excel_file, sheet, nrows=1)
                            column_headers = first_row.columns.tolist()
                            
                            key_columns = ["PropertyAddress", "FirstName", "LastName", "Ethnicity", "Nationality"]

                            if set(key_columns) <= set(column_headers):  # Check if all key columns are present in headers
                                selected_sheet = sheet
                                break
                            
                        # If a sheet containing the BC column headers is found, load it into a DataFrame
                        try:
                            if selected_sheet is not None:
                                df = pd.read_excel(excel_file, selected_sheet)
                                df = df.dropna(subset=key_columns)
                                
                                # trim the dataframe if an object is a string, removing whitespace
                                df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
                                
                                # this gets us the cycle number and value
                                df['CycleNumber'] = dir
                                cycle_pattern = r'[^\d]+(\d+)[^\d]+'
                                cycle_number = re.sub(cycle_pattern, r'\1', dir)
                                df['CycleNumberValue'] = cycle_number
                                
                                # this sections gets us the source file for the record, as well as the provider
                                source_pattern = r'(?i)C\d+ TL'
                                df['Source'] = file.replace('.xlsx','')
                                df['ExtractedProviderName'] = df['Source'].str.replace(source_pattern, '', regex=True)
                                df['ProviderName'] = df['ExtractedProviderName'].apply(lambda x: lookup_support_provider(x))
                        
                                # add load date
                                df['LoadDate'] = load_date
                            
                                # adds group referral agency for later - this is cleaned by a seperate process in airflow
                                df['GroupedReferralAgency'] = df['ReferralAgency']
                                
                                # this section creates a UID for each tenant, by concating strings and then shortening to length
                                df['Tenant_SK'] = df['LastName'].replace(' ','').fillna(df['FirstName']).astype(str).str.cat(df['NINumber'].replace(' ','').fillna(df['HBClaimRefNumber']).replace(' ','').astype(str), sep='_') # worst code ever
                                df['Tenant_SK'] = df['Tenant_SK'].str.upper()
                                df['Tenant_SK'] = df['Tenant_SK'].str.slice(0,100) # shorten
                                
                                # drop any duplicate SKs
                                df = df.drop_duplicates(subset=['Tenant_SK'], keep='last')

                                all_files.append(df)
                                processed_file_path = os.path.join(processed_dir, file)
                                shutil.move(processing_file_path, processed_file_path)
                                print(f"Processed -> {file}")
                                
                                # check if any records are incorrect - if so, move to errors
                                # error_check = df.loc[df['RecordStatus'] != 'Correct']
                                # error_check = (df['RecordStatus'] != 'Correct').any()
                                # if error_check:
                                #     shutil.move(processing_dir, error_dir)
                                #     logging.error(f"{file} -> failed due to not passing record checks")
                                # else:
                                #     # if no errors, move to processed
                                #     processed_data.append(df)
                                #     shutil.move(processing_dir, processed_dir)     
                                
                            else: # if no sheet found, move to errors
                                error_file_path = os.path.join(error_dir, file)
                                shutil.move(processing_file_path, error_file_path)
                                logging.error(f"{file} -> failed due to mismatched column headers\n")
                        except Exception as e:
                            print(f"Found error with {file} -> {e}")
                            pass
        
    # concat everything together
    fdf = pd.concat(all_files)
    
    # if the column does not exist, create it as a None value
    try:
        fdf = fdf[database_schema]
    except:
        for col in database_schema:
            if col not in fdf.columns:
                fdf[col] = None
        fdf = fdf[database_schema]
    fdf = fdf.replace(to_replace={np.nan : "UNKNOWN", '' : "UNKNOWN", ' ' : "UNKNOWN", "nan" : "UNKNOWN"})
    for col in fdf.columns:
        fdf[col] = fdf[col].apply(lambda x: replace_text(str(x)))
    
    # minor transformation
    column_data_types = {
        'Tenant_SK' : 'VARCHAR (100)',
        'PropertyAddress' : 'VARCHAR (100)',
        'Room' : 'VARCHAR(30)', 
        'FirstName' : 'VARCHAR(100)', 
        'MiddleName': 'VARCHAR(100)', 
        'LastName': 'VARCHAR(100)',
        'DateOfBirth' : 'VARCHAR(30)', 
        'NINumber' : 'VARCHAR(30)', 
        'CheckinDate' : 'VARCHAR(30)', 
        'CheckoutDate': 'VARCHAR(30)' ,
        'NewHBClaim': 'VARCHAR(30)',
        'HBClaimRefNumber': 'VARCHAR(100)', 
        'ReferralAgency': 'VARCHAR(100)',
        'GroupedReferralAgency' : 'VARCHAR(100)',
        'Age' : "VARCHAR(10)", 
        'Gender' : 'VARCHAR(30)', 
        'Religion' : 'VARCHAR(200)',
        'Ethnicity' : 'VARCHAR(200)', 
        'Nationality' : 'VARCHAR(200)', 
        'Disability' : 'VARCHAR(200)', 
        'SexualOrientation' : 'VARCHAR(100)',
        'SpokenLanguage': 'VARCHAR(100)', 
        'RiskAssessment': 'VARCHAR(100)', 
        'LengthOfStay': 'VARCHAR(100)', 
        'CycleNumber' : 'VARCHAR(100)',
        'CycleNumberValue' : 'INT',
        'Source' : 'VARCHAR(100)',
        'ExtractedProviderName': 'VARCHAR(100)',
        'ProviderName' : 'VARCHAR(100)',
        'LoadDate': 'DATETIME'
    }
    
    # write to motherduck
    con.sql(f"INSERT INTO {bronze_schema}.{bronze_table_name} AS SELECT * FROM fdf;")
    con.close()
        
if __name__ == '__main__':
    
    # this is the ETL task
    bronze_schema = 'bronze'
    bronze_table_name = 'tenant_data'
    
    job_load_tenant_data(
        token=token,
        bronze_schema=bronze_schema,
        bronze_table_name=bronze_table_name
    )

Starting 'job_load_tenant_data'...
Processed -> Carefuge C81 TL.xlsx
Processed -> Mercian Heart C81 TL.xlsx
Processed -> Lifestyle C81 TL.xlsx
Processed -> Peratree_ C81 TL (1).xlsx
Processed -> National C81 TL.xlsx
Processed -> Comfy C81 TL.xlsx
Processed -> MATTYS C81 TL.xlsx
Processed -> Surecity C81 TL.xlsx
Processed -> Restart C81 TL.xlsx
Processed -> Input C81 TL.xlsx
Processed -> Peratree C81 TL.xlsx
Processed -> Amber Supported C81 TL.xlsx
Processed -> Smartmove C81 TL.xlsx
Processed -> Positive C81 TL.xlsx
Processed -> HM C81 TL.xlsx
Processed -> Access Housing_ C81 TL (1).xlsx
Processed -> Local C81 TL.xlsx
Processed -> Helping Hands C81 TL.xlsx
Processed -> Umbrella C81 TL.xlsx
Processed -> Inspired C81 TL.xlsx
Processed -> Serenity C81 TL.xlsx
Processed -> DESTINY C81 TL.xlsx
Processed -> Sheild C81 TL.xlsx
Processed -> Elite C81 TL.xlsx
Processed -> Holte  C81 TL.xlsx
Processed -> HM Housing C81 TL.xlsx
Processed -> Access Housing_ C81 TL.xlsx
Processed -> Aspect C81 TL.xl

: 