# LAB VISUAL ETL TRANSFORMATION SCRIPT

In [None]:
import os
import pyodbc
import pandas as pd
from sqlalchemy import create_engine
from data_task_helpers import something
something()
from api_data_task_executioner.data_task_tools import initialise_data_task, DataTaskEnvironment, find_json_arg  # noqa: E402
import nest_asyncio
import sys
import time
from datetime import datetime

nest_asyncio.apply()
params = {}
environment: DataTaskEnvironment = None
timestamp = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

if __name__ == "__main__":
    environment = initialise_data_task(f"Starting Tille Lab transformation {timestamp}", params=params)
    json_args = find_json_arg(sys.argv)
    params["sleep_time"] = json_args.get("sleep_time", 1.5)

time.sleep(params["sleep_time"])

## Mysql Server Configuration

In [None]:
mysql_config = {
    "MYSQL_USER": "{mysql_user}",
    "MYSQL_PASSWORD": "{mysql_pasword}",
    "MYSQL_HOST": "localhost",
    "MYSQL_DATABASE_NAME": "labdashdb",
    "MYSQL_PORT": "3306",
}

mysql_url = f"mysql+pymysql://{mysql_config['MYSQL_USER']}:{mysql_config['MYSQL_PASSWORD']}@{mysql_config['MYSQL_HOST']}:{mysql_config['MYSQL_PORT']}/{mysql_config['MYSQL_DATABASE_NAME']}"

try:
    mysql_conn = create_engine(mysql_url)
    environment.log_message(f"connected successfully to tille lab database: {mysql_config['MYSQL_DATABASE_NAME']}")
    time.sleep(params["sleep_time"])
except Exception as e:
    environment.log_error(f"Error connecting to MySQL: {e}")
    sys.stdout.flush()
    sys.exit(1) 

## Mssql Server Configuration

In [None]:
mssql_config = {
    "server": "localhost",
    "database": "master",
    "username": "{mssql_user}",
    "password": "{mssql_pasword}",
}

mssql_conn = pyodbc.connect(
    f"DRIVER={{ODBC Driver 17 for SQL Server}};"
    f"SERVER={mssql_config['server']};"
    f"DATABASE={mssql_config['database']};"
    f"UID={mssql_config['username']};"
    f"PWD={mssql_config['password']};"
    f"TrustServerCertificate=yes;",
    autocommit=True
)

mssql_cursor = mssql_conn.cursor()

## Create Schemas

In [None]:
def create_schemas():
    try:
        schemas = ['source', 'derived', 'final', 'z', 'dbo']
        for schema in schemas:
            mssql_cursor.execute(
                f"""
                    USE lab_visual_analysis;

                    IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
                    BEGIN
                        EXEC('CREATE SCHEMA {schema}');
                    END
                """
            )
        mssql_conn.commit()
        time.sleep(params["sleep_time"])
        
    except Exception as e:
        environment.log_error(f"Error creating schemas: {e}")
        mssql_conn.rollback()
        sys.stdout.flush()
        sys.exit(1)

## Create Tables in the lab_visual_analysis database

In [None]:
def create_tables():
    table_creation_queries = [
        """
            IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'tbl_Facilities' AND schema_id = SCHEMA_ID('source'))
            CREATE TABLE source.tbl_Facilities (
                Id INT IDENTITY(1,1) PRIMARY KEY,
                HfrCode NVARCHAR(50),
                Name NVARCHAR(255),
                Region NVARCHAR(255),
                District NVARCHAR(255),
                Council NVARCHAR(255)
            );
        """,
        """
            IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'tbl_Device_Logs' AND schema_id = SCHEMA_ID('source'))
            CREATE TABLE source.tbl_Device_Logs (
                Id INT IDENTITY(1,1) PRIMARY KEY,
                DeviceName NVARCHAR(255),
                DeviceCode NVARCHAR(50),
                DateBrokenDown DATETIME2,
                DateReported DATETIME2,
                DateFixed DATETIME2,
                BreakDownReason NVARCHAR(255)
            );
        """,
        """
            IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'tbl_Commodity_Transactions' AND schema_id = SCHEMA_ID('source'))
            CREATE TABLE source.tbl_Commodity_Transactions (
                Id INT IDENTITY(1,1) PRIMARY KEY,
                CommodityName NVARCHAR(255),
                CommodityCode NVARCHAR(50),
                BatchNumber NVARCHAR(50),
                TransactionDate DATETIME2,
                ExpireDate DATETIME2,
                TransactionType NVARCHAR(50),
                TransactionQuantity INT
            );
        """,
        """
            IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'tbl_Sample' AND schema_id = SCHEMA_ID('source'))
            CREATE TABLE source.tbl_Sample (
                Id INT IDENTITY(1,1) PRIMARY KEY,
                sampletrackingid NVARCHAR(50),
                LabHfrCode NVARCHAR(50),
                HubHfrCode NVARCHAR(50),
                EntryModality NVARCHAR(50),
                SampleType NVARCHAR(255),
                TestName NVARCHAR(255),   
                SampleQualityStatus NVARCHAR(50),
                Results NVARCHAR(255),
                SampleRejectionReason NVARCHAR(255),
                DeviceName NVARCHAR(255),
                DeviceCode NVARCHAR(50),
                CollectionDate DATETIME2,
                ReceivedDate DATETIME2,
                TestDate DATETIME2,
                AuthorisedDate DATETIME2,
                DispatchDate DATETIME2
            );
        """
    ]

    for query in table_creation_queries:
        try:
            mssql_cursor.execute(query)
        except Exception as e:
            environment.log_error(f"Error creating table: {e}")

    mssql_conn.commit()
    time.sleep(params["sleep_time"])


## Create lab_visual_analysis database

In [None]:
def create_lab_visual_analysis_database():
    try:
        environment.log_message("Preparing Lab Visual Database")
        time.sleep(params["sleep_time"])
        
        mssql_cursor.execute("SELECT name FROM sys.databases WHERE name = 'lab_visual_analysis'")
        db_exists = mssql_cursor.fetchone()

        if db_exists:
            environment.log_message("Truncating LabVisual database tables")
            tables = ["tbl_Facilities", "tbl_Device_Logs", "tbl_Commodity_Transactions", "tbl_Sample"]

            for table in tables:
                try:
                    mssql_cursor.execute(f"TRUNCATE TABLE source.{table}")

                except Exception as e:
                    environment.log_error(f"Error truncating table {table}: {e}")
                    sys.stdout.flush()
                    sys.exit(1)

            mssql_conn.commit()
            environment.log_message("Lab Visual Database preparation complete")
        else:
            mssql_cursor.execute("CREATE DATABASE lab_visual_analysis")
            mssql_conn.commit()
            mssql_config["database"] = "lab_visual_analysis"

            environment.log_message("Create Schemas")
            create_schemas()

            environment.log_message("Create Tables")
            create_tables()

            environment.log_message("Lab Visual Database preparation complete")

    except Exception as e:
        environment.log_error(f"Failed to prepare database: {e}")
        mssql_conn.rollback()
        sys.stdout.flush()
        sys.exit(1)

create_lab_visual_analysis_database()

## Load Facility Data

In [None]:
def extract_and_insert_facility_data():
    excel_file = "./All_Operating_Health_Facilities_in_Tanzania-Lab-Visual-2021oct22.xlsx"
    
    if not os.path.exists(excel_file):
        environment.log_error(f"Excel file '{excel_file}' not found.")
        sys.stdout.flush()
        sys.exit(1)
    
    df_excel = pd.read_excel(excel_file)
    df_excel.rename(columns={"Facility Number": "HfrCode", "Facility Name": "Name"}, inplace=True)
    df_excel = df_excel[['HfrCode', 'Name', 'Region', 'District', 'Council']]
    df_excel['Region'] = df_excel['Region'].str.replace("Region", "", regex=True).str.strip()
    
    mysql_query = """
        SELECT
            mohswid AS HfrCode,
            facilityname AS Name,
            regionname AS Region,
            districtname AS District,
            council AS Council
        FROM hubfacilities
    """
    try:
        df_mysql = pd.read_sql(mysql_query, mysql_conn)
    except Exception as e:
        environment.log_error(f"Error fetching data from MySQL: {e}")
        sys.stdout.flush()
        sys.exit(1)
    
    # Merge the two datasets
    df_combined = pd.concat([df_excel, df_mysql])
    
    # Remove duplicates based on HfrCode
    df_combined.drop_duplicates(subset=['HfrCode'], keep='first', inplace=True)

    insert_query = """
        INSERT INTO source.tbl_Facilities (HfrCode, Name, Region, District, Council)
        VALUES (?, ?, ?, ?, ?);
    """
    
    try:
        for _, row in df_combined.iterrows():
            mssql_cursor.execute(insert_query, row['HfrCode'], row['Name'], row['Region'], row['District'], row['Council'])
            
        mssql_conn.commit()
        environment.log_message(f"{len(df_combined)} rows inserted into tbl_Facilities.")
    except Exception as e:
        environment.log_error(f"Error inserting data: {e}")
        mssql_conn.rollback()
        sys.stdout.flush()
        sys.exit(1)

    time.sleep(params["sleep_time"])

extract_and_insert_facility_data()

## Load Sample Data

In [None]:
def extract_and_insert_sample_data():
    query = """
        SELECT DISTINCT trackingID as sampletrackingid, 
                        facilityHfrID, 
                        sampleType as SampleType, 
                        testName as TestName, 
                        sampleQuality as SampleQualityStatus, 
                        rejectionReason as SampleRejectionReason, 
                        sampleCollectionDate as CollectionDate, 
                        dateReceivedLab as ReceivedDate, 
                        results as Results, 
                        testedDate as TestDate, 
                        resultAuthorisedDate as AuthorisedDate, 
                        resultAuthorisedDate as DispatchDate,
                        testInstrument as DeviceName,
                        NULL as DeviceCode,
                        IF(SUBSTR(trackingID, 1, 4) = 'BC03', 'lab', 'hub') as EntryModality
        FROM tbl_labtests
        WHERE sampleCollectionDate >= DATE_SUB(CURDATE(), INTERVAL 2 MONTH)
        OR dateSentLab >= DATE_SUB(CURDATE(), INTERVAL 2 MONTH)
        OR dateReceivedLab >= DATE_SUB(CURDATE(), INTERVAL 2 MONTH)
        OR registeredDate >= DATE_SUB(CURDATE(), INTERVAL 2 MONTH)
        OR testedDate >= DATE_SUB(CURDATE(), INTERVAL 2 MONTH)
        OR resultAuthorisedDate >= DATE_SUB(CURDATE(), INTERVAL 2 MONTH)
        OR dateResultSentHub >= DATE_SUB(CURDATE(), INTERVAL 2 MONTH)
    """
    
    try:
        sample_data = pd.read_sql(query, mysql_conn)
        if sample_data.empty:
            environment.log_error("No sample data found.")
            sys.stdout.flush()
            sys.exit(1)

        def extract_hfr_code(row):
            if row['EntryModality'] == 'lab':
                return row['facilityHfrID'], None
            else:
                return None, row['facilityHfrID']

        sample_data[['LabHfrCode', 'HubHfrCode']] = sample_data.apply(extract_hfr_code, axis=1, result_type='expand')

        for _, row in sample_data.iterrows():
            insert_query = """
                INSERT INTO source.tbl_Sample (sampletrackingid, LabHfrCode, HubHfrCode, EntryModality, SampleType, 
                                            TestName, SampleQualityStatus, Results, SampleRejectionReason, DeviceName, 
                                            DeviceCode, CollectionDate, ReceivedDate, TestDate, AuthorisedDate, DispatchDate)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """
            mssql_cursor.execute(insert_query, row['sampletrackingid'], row['LabHfrCode'], row['HubHfrCode'], 
                                row['EntryModality'], row['SampleType'], row['TestName'], row['SampleQualityStatus'], 
                                row['Results'], row['SampleRejectionReason'], row['DeviceName'], row['DeviceCode'], row['CollectionDate'], 
                                row['ReceivedDate'], row['TestDate'], row['AuthorisedDate'], row['DispatchDate'])
        
        mssql_conn.commit()
        if len(sample_data) > 0:
            environment.log_message(f"{len(sample_data)} row inserted into tbl_Sample.")
            time.sleep(params["sleep_time"])

    except Exception as e:
        mssql_conn.rollback()
        environment.log_error(f"Error inserting data: {e}")

extract_and_insert_sample_data()
time.sleep(params["sleep_time"])

## Load Device Logs Data

In [None]:
def extract_and_insert_device_log_data():
    query = """
        select
            deviceName as DeviceName,
            deviceCode as DeviceCode,
            dateBreakDown as DateBrokenDown,
            dateReported as DateReported,
            dateFixed as DateFixed,
            breakDownReason as BreakDownReason
        from
            instrumentlogs2
    """
    
    try:
        device_logs = pd.read_sql(query, mysql_conn)
        
        insert_query = """
        INSERT INTO source.tbl_Device_Logs (DeviceName, DeviceCode, DateBrokenDown, DateReported, DateFixed, BreakDownReason)
        VALUES (?, ?, ?, ?, ?, ?)
        """
        
        for _, row in device_logs.iterrows():
            mssql_cursor.execute(insert_query, row['DeviceName'], row['DeviceCode'], row['DateBrokenDown'], row['DateReported'], row['DateFixed'], row['BreakDownReason'])
        
        mssql_conn.commit()
    except Exception as e:
        environment.log_error(f"Error fetching or inserting device logs: {e}")
        mssql_conn.rollback()
        sys.stdout.flush()
        sys.exit(1)

extract_and_insert_device_log_data()
time.sleep(params["sleep_time"])

## Load Commodity Transaction Data

In [None]:
def extract_and_insert_commodity_transaction_data():
    query = """SELECT commodityName AS CommodityName, commodityCode AS CommodityCode, batchNo AS BatchNumber, transactionDate AS TransactionDate, 
                    expireDate AS ExpireDate, transactionType AS TransactionType, quantity AS TransactionQuantity FROM commoditytransactions"""
    
    try:
        commodity_transactions = pd.read_sql(query, mysql_conn)
        
        insert_query = """
        INSERT INTO source.tbl_Commodity_Transactions (CommodityName, CommodityCode, BatchNumber, TransactionDate, ExpireDate, TransactionType, TransactionQuantity)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """
        
        for _, row in commodity_transactions.iterrows():
            mssql_cursor.execute(insert_query, row['CommodityName'], row['CommodityCode'], row['BatchNumber'], row['TransactionDate'],
                                row['ExpireDate'], row['TransactionType'], row['TransactionQuantity'])
        
        mssql_conn.commit()
        if len(commodity_transactions) > 0:
            environment.log_message(f"{len(commodity_transactions)} rows inserted into tbl_Commodity_Transactions.")
            time.sleep(params["sleep_time"])
    
    except Exception as e:
        environment.log_error(f"Error fetching or inserting commodity transactions: {e}")
        mssql_conn.rollback()
        sys.stdout.flush()
        sys.exit(1)
        
extract_and_insert_commodity_transaction_data()
time.sleep(params["sleep_time"])

## Read the stored procedures file

In [None]:
def extract_stored_procedures_from_file():
    environment.log_message("Running ETL")
    time.sleep(params["sleep_time"])

    stored_procedures_file = "./create-stored-procedures.sql"
    if os.path.exists(stored_procedures_file):
        with open(stored_procedures_file, "r") as file:
            sql_commands = file.read()

        sql_batches = sql_commands.split("GO")
        environment.log_message("Add SP")
        
        for batch in sql_batches:
            batch = batch.strip()

            if not batch or batch.startswith("--") or batch.startswith("/*"):
                continue

            if batch:
                try:
                    mssql_cursor.execute(batch)
                    mssql_conn.commit()
                except Exception as e:
                    environment.log_error(f"Error loading batch: {e}")
                    mssql_conn.rollback()
                    sys.stdout.flush()
                    sys.exit(1)
    else:
        environment.log_error(f"SQL file '{stored_procedures_file}' not found.")
        sys.stdout.flush()
        sys.exit(1)

extract_stored_procedures_from_file()

## Execute Data Transformation

In [None]:
def execute_stored_procedures():
    try:
        sp_main = ["dbo.sp_data_processing", "z.sp_data_processing", "derived.sp_data_processing", "final.sp_data_processing"]
        for sp in sp_main:
            sp = f"EXEC {sp}"
            mssql_cursor.execute(sp)
            
            while mssql_cursor.nextset():
                pass
        mssql_conn.commit()

    except Exception as e:
        environment.log_error(f"Error executing stored procedure: {e}")
        mssql_conn.rollback()
        sys.stdout.flush()
        sys.exit(1)
    finally:
        if 'mssql_conn' in locals():
            mssql_cursor.close()
            mssql_conn.close()
            mysql_conn.dispose()

environment.log_message("ETL complete")
execute_stored_procedures()


In [None]:
time.sleep(params["sleep_time"])
environment.log_message(f'Completed Tille Lab Transformation {timestamp}')