In [None]:
# Library list🤖
import glob, logging, warnings, polars as pl, datetime, os, zipfile, xml.dom.minidom
from datetime import datetime as dt, time as t, timedelta
import pandas as pd, numpy as np, sqlalchemy as sa, xlsxwriter
from sqlalchemy import create_engine, text
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from polars.exceptions import ColumnNotFoundError, PanicException
from pathlib import Path
from IPython.display import HTML
from tabulate import tabulate
# -----------------------------------------------------------------------------------------------#
# --- Logging configuration📜 ---
log_directory = Path(os.environ['USERPROFILE']) / r'Concentrix Corporation//CNXVN - WFM Team - Documents//DataBase//DataFrame//BKN//ScriptLogs//'
log_directory.mkdir(parents=True, exist_ok=True) 
log_filename = log_directory / f"import_log_{dt.now():%Y%m%d_%H%M%S}.log"
logging.basicConfig(
    level=logging.INFO,  # (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
    handlers=[
        logging.FileHandler(log_filename, encoding='utf-8'), 
    ],force=True)
# Create logger object
logger = logging.getLogger('ServerImportScript')
# -----------------------------------------------------------------------------------------------#
# Source collection📥
user_credential = Path(os.environ['USERPROFILE']) / r'Concentrix Corporation//CNXVN - WFM Team - Documents//'
# [BKN]DATA_TRACKER 📑
data_tracker=os.path.join(user_credential, 
                            r'DataBase//DataFrame//BKN//DATA_TRACKER')
atd=os.path.join(user_credential, 
                            r'DataBase//DataFrame//BKN//ATD_DF')
# -----------------------------------------------------------------------------------------------#
# Database_Connecter🧬

server_name = "PHMANVMDEV01V"
server_ip = "10.5.11.60"
database = "wfm_vn_dev"
user = "usr_wfmvn_dev"
password = "12guWU2OdEj5kEspl9Rlfoglf"
# SQL Server Authentication 🔗
connection_string = f"mssql+pyodbc://{user}:{password}@{server_ip}/{database}?driver=ODBC+Driver+17+for+SQL+Server"
# Windows Authentication 🔗
# connection_string = f"mssql+pyodbc://{server_name}/{database}?driver=ODBC+Driver+17+for+SQL+Server&Trusted_Connection=yes"
try:
    engine = create_engine(connection_string, fast_executemany=True)
    logger.info(f"✅ Successfully connected to DB: {database} server: {server_ip}")
except Exception as e:
    logger.exception("❌ DB Connection error")
    raise

In [None]:
# Function Definition🛠️

# Log Color view💡
def print_colored(text, color):
    display(HTML(f'<span style="color: {color};">{text}</span>'))

# Check existing log file💡
def read_or_create_log(log_path):
    with warnings.catch_warnings():
        warnings.simplefilter('ignore') # Ignor Warning📃
        try:
            logger.debug(f"Reading log file: {log_path}")
            log_df = pl.read_excel(log_path)
            log_df = log_df.with_columns([pl.col("ModifiedDate").dt.cast_time_unit("ms")], strict=False)
            logger.info(f"Success read log file: {log_path}")
        except FileNotFoundError: # Create new log if can't find log📃
            logger.warning(f"Log file not found: {log_path}. Create new log.")
            log_df = pl.DataFrame(
                {
                    "FileName": pl.Series([], dtype=pl.Utf8),
                    "ModifiedDate": pl.Series([], dtype=pl.Datetime),
                    "Error": pl.Series([], dtype=pl.Utf8),})
        except Exception as e: # Create new log if can't open log📃
            logger.exception(f"Error reading log file: {log_path}")
            print(f"Error reading log file: {e}")
            log_df = pl.DataFrame(
                {
                    "FileName": pl.Series([], dtype=pl.Utf8),
                    "ModifiedDate": pl.Series([], dtype=pl.Datetime),
                    "Error": pl.Series([], dtype=pl.Utf8),})
        return log_df
        
# Update log_df💡
def process_and_save_log(log_df, log_entries, log_path):
    if log_entries:
        new_log_df = pl.DataFrame(log_entries)
        log_df = log_df.with_columns(pl.col('ModifiedDate').dt.cast_time_unit("ms"))
        log_df = (pl.concat([log_df, new_log_df], how="diagonal_relaxed") # Combine and remove duplicate New_Log and Old_Log📃
                  .sort("ModifiedDate", descending=[False])
                  .unique(subset=["FileName"], keep="last")
                  .sort("FileName", descending=[False])
                  .select(["FileName", "ModifiedDate", "Error"]))
        try:
            log_df.write_excel(log_path, worksheet="ImportLog", autofit=True)
            print(f"Import log saved to: {log_path}")
            logger.info(f"Import log saved to: {log_path}")
        except Exception as e:
            print(f"Error writing log file: {e}")
            logger.error(f"Error writing log file: {log_path} - {e}")

# write_data💡
def write_data(engine, table_name, df): # write to database📃
     df.write_database(table_name=table_name, connection=engine, if_table_exists="append")
    
# delete_data💡
def delete_data(engine, table_name, filename):
    try:
        with engine.connect() as connection:
            print_colored(f"Prepare to delete old data for '{filename}' in '{table_name}'", "DarkTurquoise")
            logger.warning(f"Prepare to delete old data for '{filename}' in '{table_name}'")
            delete_query = text(f"DELETE FROM {table_name} WHERE [FileName] = :filename")
            connection.execute(delete_query, {"filename": filename})
            connection.commit()
            print_colored(f"Old data deleted successfully🧹", "DarkTurquoise")
            logger.info(f"'{filename}' data deleted successfully in '{table_name}' 🧹.")
    except Exception as e:
        logger.exception(f"Error while delete data for '{filename}' in '{table_name}'")
        print_colored(f"Error while delete data for '{filename}' in '{table_name}'", "DarkTurquoise")
        raise 
        
# Check Time💡
def is_time_between(begin_time, end_time, check_time=None):
    check_time = check_time or datetime.utcnow().time() # If check time is not given, default to current UTC time📃
    if begin_time < end_time:
        return check_time >= begin_time and check_time <= end_time
    else: # crosses midnight📃
        return check_time >= begin_time or check_time <= end_time
def time_difference(time1, time2):
    seconds1 = time1.hour * 3600 + time1.minute * 60 + time1.second # Convert times to seconds📃
    seconds2 = time2.hour * 3600 + time2.minute * 60 + time2.second
    diff_seconds = seconds1 - seconds2
    return diff_seconds

# Final Summary💡
def display_summary(source_name: str, error_count: int) -> None:
    """Final Notice."""
    if error_count > 0:
        print_colored(f"Finished processing all files ({error_count} have errors🛠️).", "OrangeRed")
        logger.warning(f"Finished processing all files ({error_count} have errors🛠️).")
    else:
        print_colored(f"Finished processing all files (no errors🎉).", "PaleVioletRed")
        logger.info(f"Finished processing [{source_name}] (no errors🎉).")

# Default_variable💡
def Default_variable():
    log_entries = []
    error_count = 0
    return log_entries, error_count

# parse_date💡
def parse_date(col: pl.Expr) -> pl.Expr:
    return pl.coalesce(
        col.str.strptime(pl.Date, format="%m/%d/%Y", strict=False),
        col.str.strptime(pl.Date, format="%Y-%m-%d", strict=False),
        col.str.strptime(pl.Date, format="%d %B %Y", strict=False),
        col.str.strptime(pl.Date, format="%B %d, %Y", strict=False),
        col.str.strptime(pl.Date, format="%d-%b-%y", strict=False),
        col.str.strptime(pl.Date, format="%Y%m%d", strict=False),
        col.str.strptime(pl.Date, format="%d/%m/%y", strict=False),
        col.str.strptime(pl.Date, format="%d-%m-%Y", strict=False),
    )

# validate_schema💡
def validate_schema(df: pl.DataFrame, expected_schema: list[str], filename: str) -> tuple[bool, str | None]:
    # Start validation
    start_msg = f"🔍 Starting schema validation for file: {filename}"
    logger.info(start_msg)
    print_colored(start_msg, "DodgerBlue")
    actual_columns = df.columns
    expected_set = set(expected_schema)
    actual_set = set(actual_columns)
    missing_columns = expected_set - actual_set
    extra_columns = actual_set - expected_set
    has_critical_error = False
    critical_error_message = None
    has_warnings = False
    # 1. Schema error (Missing columns)
    if missing_columns:
        has_critical_error = True
        critical_error_message = f"Schema error in the file: '{filename}'. Missing columns: {sorted(list(missing_columns))}"
        logger.error(critical_error_message)
        print_colored(f"❗️ {critical_error_message}", "OrangeRed")
    # 2. warning extra columns
    if extra_columns:
        has_warnings = True
        warning_message = f"warning schema for file '{filename}'. Extra columns: {sorted(list(extra_columns))}. These columns will be excluded from the import process."
        logger.warning(warning_message)
        print_colored(f"⚠️ {warning_message}", "Gold")
    # 3. Final results announcement
    if not has_critical_error and not has_warnings:
        final_msg = f"✅ Completely valid schema for the file: {filename}."
        logger.info(final_msg)
        print_colored(final_msg, "MediumSeaGreen")
    elif not has_critical_error and has_warnings:
        final_msg = f"⚠️ File schema check: {filename} Passed (No missing columns, extra columns warned)"
        logger.info(final_msg)
        print_colored(final_msg, "MediumSeaGreen") # Vẫn dùng màu xanh lá
    elif has_critical_error:
        final_msg = f"❌ Schema validation failed due to missing column(s) for file: {filename}."
        logger.warning(final_msg) # Log ở mức warning hoặc error tùy ý
        print_colored(final_msg, "OrangeRed")
    return has_critical_error, critical_error_message
    
# DF Info💡
def info_polars(df: pl.DataFrame):
    print_colored(f"⚙️Final structure", "Olive")
    logger.info(f"⚙️Final structure")
    shape = df.shape
    print(f"Shape: {shape}")
    print("Data columns:")  
    table_data = []
    for i, name in enumerate(df.columns):
        dtype = df.dtypes[i]
        non_null_count = df.select(pl.col(name).is_not_null().sum()).item()
        table_data.append([i, name, non_null_count, dtype])  
    headers = ["#", "Column", "Non-Null Count", "Dtype"]
    print(tabulate(table_data, headers=headers, tablefmt="grid"))
    logger.info(tabulate(table_data, headers=headers, tablefmt="grid"))

In [None]:
# MaintainDatabase🧰

print_colored("===== Starting Index Rebuild Process =====", "DodgerBlue")

MaintainDatabase_sql = """

EXEC BCOM.usp_MaintainDatabase

"""

try:
    with engine.connect() as connection:
        print_colored("⚙️Executing Index Rebuild script (this may take a long time)...", "DarkOrange")
        connection.execute(text(MaintainDatabase_sql))
        connection.commit() 
        print_colored("✔️Index Rebuild script execution command sent and committed. Check SQL Server logs/output for details.", "MediumSeaGreen")
except sa.exc.SQLAlchemyError as e_db:
    print_colored(f"❌ Database error during Index Rebuild Process: {e_db}", "OrangeRed")
except Exception as e_general:
    print_colored(f"❌ An unexpected error occurred during Index Rebuild Process: {e_general}", "OrangeRed")

print_colored("===== Index Rebuild Process attempt is complete (Python perspective) =====", "DodgerBlue")

In [None]:
# ⚙️Create_EEAAO
logger.info("===== Starting EEAAO Process =====")
# EXEC EEAAO Procedure
Exec_EEAAO = """
EXEC BCOM.Refresh_EEAAO_Data;
"""
select_query = """
SELECT TOP 5 * FROM BCOM.EEAAO;
"""
try:
    with engine.connect() as connection:
        logger.info("⚙️Executing procedure EEAAO ...")
        print("⚙️Executing procedure EEAAO ...")
        connection.execute(text(Exec_EEAAO))
        connection.commit()
        logger.info("✔️Successfully executed and committed Procedure EEAAO.")
        print("✔️Successfully executed and committed Procedure EEAAO.")
        logger.info(f"Reading data from BCOM.EEAAO with query: {select_query.strip()}")
        print(f"Reading data from BCOM.EEAAO with query: {select_query.strip()}")
        df_eeao_result = pl.read_database(query=select_query, connection=connection)
        if df_eeao_result is not None and not df_eeao_result.is_empty():
            print_colored("Sample data from BCOM.EEAAO after refresh:", "MediumSeaGreen")
            display(df_eeao_result)
            logger.info(f"Successfully read {df_eeao_result.shape[0]} rows from BCOM.EEAAO.")
        else:
            logger.warning("No data returned from BCOM.EEAAO after refresh or procedure did not complete in time.")
            print_colored("No data returned from BCOM.EEAAO after refresh.", "OrangeRed")
except sa.exc.SQLAlchemyError as e:
    logger.error(f"Database error during EEAAO Process: {e}", exc_info=True)
    print(f"Database error: {e}") 
except Exception as e:
    logger.error(f"An unexpected error occurred during EEAAO Process: {e}", exc_info=True)
    print(f"An unexpected error: {e}")
logger.info("===== Processing of EEAAO is complete =====")

In [None]:
# PowerBI Notice🔊

sql_query = """
/* DATA TRACKER  */
 
--EPS
select 'Working Hours' as [DISPLAY_NAME],cast(max([Session Login]) as date) as [LastestData] from BCOM.EPS
union all
--CPI
select 'CPI' as [DISPLAY_NAME],cast(max([Date]) as date) as [LastestData] from BCOM.CPI
union all
--CPI_PEGA
select 'PEGA Swivel' as [DISPLAY_NAME],cast(max([Day of Date]) as date) as [LastestData] from BCOM.CPI_PEGA
union all
--CSAT_RS
select 'CSAT' as [DISPLAY_NAME],cast(max([Sort by Dimension]) as date) as [LastestData] from BCOM.CSAT_RS
union all
--PSAT
select 'PSAT' as [DISPLAY_NAME],cast(max([Sorted by Dimension]) as date) as [LastestData] from BCOM.PSAT
union all
--Quality
select 'QUALITY' as [DISPLAY_NAME],cast(max([eval_date]) as date) as [LastestData] from BCOM.Quality
union all
--AHT2
select 'AHT' as [DISPLAY_NAME],cast(max([Date]) as date) as [LastestData] from BCOM.AHT2
union all
--ROSTER
select 'ROSTER' as [DISPLAY_NAME],cast(max([Attribute]) as date) as [LastestData] from BCOM.ROSTER

"""

# Read Query
datatracker_df = pl.read_database(query=sql_query, connection=engine)
engine.dispose()
datatracker_df = datatracker_df.with_columns(pl.col("LastestData").dt.strftime("%Y-%m-%d"))
# Export to CSV
os.chdir(data_tracker)
datatracker_CSV = datatracker_df.write_excel(workbook="BKN_DATA_TRACKER.xlsx",worksheet="Sheet1")  
datatracker_df

In [None]:
# ATD to TEAMS🔗

sql_query_link = """
SELECT [Date],[LOB],[TL_Name],[Emp ID],[Emp_Name] AS [Name],[Shift],
CASE WHEN [ScheduleHours(H)]>4 and [CUICLoggedTime(s)]>0 then 1 
when [ScheduleHours(H)]>0 and [CUICLoggedTime(s)]>0 then 0.5 else 0 end as [Present],
CASE WHEN [SchedLeave(H)]-[SchedUPL(H)]>4 THEN 1 WHEN [SchedLeave(H)]-[SchedUPL(H)]>0 THEN 0.5 ELSE 0 END AS [PlanLeave],
CASE WHEN [SchedUPL(H)]>4 THEN 1 WHEN [SchedUPL(H)]>0 THEN 0.5 ELSE 0 END AS [UnplanLeave],
CASE WHEN [ScheduleHours(H)]>0 and [CUICLoggedTime(s)]<=0 then 
(CASE WHEN [ScheduleHours(H)]>4 THEN 1 WHEN [ScheduleHours(H)]>0 THEN 0.5 ELSE 0 END)
else 0 end as [UPL-Not present at desk]
FROM BCOM.EEAAO
where [Date]=DATEADD(DAY, -1,CAST(GETDATE() As Date)) and [Shift]not in ('OFF','Training','New Hire Training')
Order by [TL_Name], [Shift] DESC
"""
sql_query_overall = """
with atd as (
SELECT [Date],[LOB],[TL_Name],[Emp ID],[Emp_Name] AS [Name],[week_shift],[Shift],
CASE WHEN [ScheduleHours(H)]>4 and [CUICLoggedTime(s)]>0 then 1 
when [ScheduleHours(H)]>0 and [CUICLoggedTime(s)]>0 then 0.5 else 0 end as [ATD],
CASE WHEN [ScheduleHours(H)]>4 THEN 1 WHEN [ScheduleHours(H)]>0 THEN 0.5 ELSE 0 END AS [NormalShift],
CASE WHEN [SchedLeave(H)]-[SchedUPL(H)]>4 THEN 1 WHEN [SchedLeave(H)]-[SchedUPL(H)]>0 THEN 0.5 ELSE 0 END AS [PlanLeave],
CASE WHEN [SchedUPL(H)]>4 THEN 1 WHEN [SchedUPL(H)]>0 THEN 0.5 ELSE 0 END AS [UnplanLeave],
CASE WHEN [ScheduleHours(H)]>0 and [CUICLoggedTime(s)]<=0 then 
(CASE WHEN [ScheduleHours(H)]>4 THEN 1 WHEN [ScheduleHours(H)]>0 THEN 0.5 ELSE 0 END)
else 0 end as [UPL-Not present at desk],
CASE WHEN [Shift]='OFF' then 0 else 1 end as [Headcount]
FROM BCOM.EEAAO
where [Date]=DATEADD(DAY, -1,CAST(GETDATE() As Date)) and [Shift] not in ('OFF','Training','New Hire Training'))
select [Date],
sum([Headcount]) as [Headcount],sum([ATD]) as [Present],sum([UPL-Not present at desk]) as [UPL-Not present at desk],
sum([UnplanLeave]) as [UPL-Request within 7 days],sum([PlanLeave]) as [PlanLeave],
case when sum([Headcount])=0 then 0 else sum([ATD])/sum([Headcount]) end as [Present%]
from atd
group by [Date]
"""
sql_query_detail = """
with atd as (
SELECT [Date],[LOB],[TL_Name],[Emp ID],[Emp_Name] AS [Name],[week_shift],[Shift],
CASE WHEN [ScheduleHours(H)]>4 and [CUICLoggedTime(s)]>0 then 1 
when [ScheduleHours(H)]>0 and [CUICLoggedTime(s)]>0 then 0.5 else 0 end as [ATD],
CASE WHEN [ScheduleHours(H)]>4 THEN 1 WHEN [ScheduleHours(H)]>0 THEN 0.5 ELSE 0 END AS [NormalShift],
CASE WHEN [SchedLeave(H)]-[SchedUPL(H)]>4 THEN 1 WHEN [SchedLeave(H)]-[SchedUPL(H)]>0 THEN 0.5 ELSE 0 END AS [PlanLeave],
CASE WHEN [SchedUPL(H)]>4 THEN 1 WHEN [SchedUPL(H)]>0 THEN 0.5 ELSE 0 END AS [UnplanLeave],
CASE WHEN [ScheduleHours(H)]>0 and [CUICLoggedTime(s)]<=0 then 
(CASE WHEN [ScheduleHours(H)]>4 THEN 1 WHEN [ScheduleHours(H)]>0 THEN 0.5 ELSE 0 END)
else 0 end as [UPL-Not present at desk],
CASE WHEN [Shift]='OFF' then 0 else 1 end as [Headcount]
FROM BCOM.EEAAO
where [Date]=DATEADD(DAY, -1,CAST(GETDATE() As Date)) and [Shift] not in ('OFF','Training','New Hire Training'))
select [Date],[TL_Name],
sum([Headcount]) as [Headcount],sum([ATD]) as [Present],sum([UPL-Not present at desk]) as [UPL-Not present at desk],
sum([UnplanLeave]) as [UPL-Request within 7 days],sum([PlanLeave]) as [PlanLeave],
case when sum([Headcount])=0 then 0 else sum([ATD])/sum([Headcount]) end as [Present%]
from atd
group by [Date],[TL_Name]
order by case when sum([Headcount])=0 then 0 else sum([ATD])/sum([Headcount]) end asc
"""

# Read Query
atd_link = pl.read_database(query=sql_query_link, connection=engine)
atd_overall = pl.read_database(query=sql_query_overall, connection=engine)
atd_detail = pl.read_database(query=sql_query_detail, connection=engine)
atd_overall = atd_overall.with_columns(pl.col("Date").dt.to_string("%Y-%m-%d"))
atd_detail = atd_detail.with_columns(pl.col("Date").dt.to_string("%Y-%m-%d"))
atd_overall = atd_overall.to_pandas()
atd_detail = atd_detail.to_pandas()
atd_overall['Present%'] = atd_overall['Present%'].map('{:.2%}'.format)
atd_detail['Present%'] = atd_detail['Present%'].map('{:.2%}'.format)
atd_overall = pl.from_pandas(atd_overall)
atd_detail = pl.from_pandas(atd_detail)
# Export to CSV
os.chdir(atd)
atd_link_CSV = atd_link.write_csv("BKN_ATD_DF.csv")  
atd_overall_CSV = atd_overall.write_excel(workbook="BKN_pivot_ATD_DF_total.xlsx",worksheet="Sheet1",
                                           table_name='Frame0', table_style='Table Style Medium 2',autofit=True) 
atd_detail_CSV = atd_detail.write_excel(workbook="BKN_pivot_ATD_DF.xlsx",worksheet="Sheet1",
                                           table_name='Frame0', table_style='Table Style Medium 2',autofit=True)  
# Display
display(atd_link)
display(atd_overall)
display(atd_detail)

In [None]:
# Close DB📃
engine.dispose()
print("Database connection closed.")
%reset -f