In [4]:
import os
import shutil
import warnings

import functools
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sshtunnel import SSHTunnelForwarder, BaseSSHTunnelForwarderError

import logging


DATA = {
    "sales": {
        "FOLDER_PATH_IN": 'C:\\Users\\dmandree\\Downloads\\TL_new',
        "FOLDER_PATH_OUT": 'C:\\Users\\dmandree\\Downloads\\TL_arch',
        "SHEET": 'TurnoverList',
        "COL_NAMES": ['Day', 'Store', 'Company', 'Open', 'Amount', 'Curr', 'Pcs', 'Rcp', 'People', 'Hours', 'Work', 'Comp:', 'Open_1', 'Amount_1', 'Curr_1', 'Pcs_1', 'Rcp_1', 'People_1', 'Hours_1', 'Work_1'],
        "COMPANIES": ['Guess Kazakhstan', 'Guess CIS'],
        "SKIP": 0,
        "IF_EXISTS": 'append'
    },
    "ms_sales": {
        "FOLDER_PATH_IN": 'C:\\Users\\dmandree\\Downloads\\RTL_new',
        "FOLDER_PATH_OUT": 'C:\\Users\\dmandree\\Downloads\\RTL_arch',
        "SHEET": 'RTL50000_by_season_by_store old',
        "COL_NAMES": ['Company', 'Country', 'Day', 'Mfg Season', 'Line Code', 'Gender', 'Dept Group', 'Dept', 'Sub Dept', 'Class', 'Class_1', 'Style', 'Style_1', 'Chain', 'Store', 'Store_1', 'Metrics', 'Ttl Sls Qty', 'TTL Curr Rtl Price €', 'Discount €', 'Ttl Sls €', 'Ttl Cost LC', 'Ttl Sls Trasp Cost LC', 'Ttl Cost €', 'Ttl Sls LC', 'Ttl Sls Trasp Cost €'],
        "COMPANIES": ['RU', 'KZ'],
        "SKIP": 3,
        "IF_EXISTS": 'append'
    },
    "ms_stock": {
        "FOLDER_PATH_IN": 'C:\\Users\\dmandree\\Downloads\\FNC_new',
        "FOLDER_PATH_OUT": 'C:\\Users\\dmandree\\Downloads\\FNC_arch',
        "SHEET": 'FNC03-50001-Margin_stock all st',
        "COL_NAMES": ['Company', 'Day', 'Store', 'Store_1', 'Mfg Season', 'Line Code', 'Line_Code_1', 'Style', 'Style_1', 'Sub_Dept', 'Sub_Dept_1', 'Metrics', 'TTL EOH Ttl Qty', 'TTL Loading Cost €', 'TTL Loading Cost LC', 'TTL Trasp Cost €', 'Cost €'],
        "COMPANIES": ['RU', 'KZ'],
        "SKIP": 2,
        "IF_EXISTS": 'replace'
    }
}


# Folder for Dict
DICT_PATH = 'C:\\Users\\dmandree\\OneDrive - Guess Inc\\D Project\\Dict\\Mapping.xlsx'

# List of pages that we transform into dataframes
LIST_OF_SHEETS = ["Stores", "Dist_managers", "VM", "Fin_Calendar_old", "Fin_Calendar_new", "Template", "Start_date"]

# DB and SSH cnnection parameters
DB_PARAMS = {
    'database': 'postgres',
    'user': 'postgres',
    'password': '1296',
    'host': 'localhost'
}

SSH_TUNNEL_PARAMS = {
    'ssh_address_or_host': ('79.174.86.163', 22),
    'ssh_username': 'root',
    'ssh_password': 'S0SJcmYwL0ZsmUId',
    'remote_bind_address': ('127.0.0.1', 5432),
    'local_bind_address': ('127.0.0.1', 8001)
}

# Logging config
logging.basicConfig(level=logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Adding a formatter to the root logger
for handler in logging.root.handlers:
    handler.setFormatter(formatter)

# Ignore all UserWarnings
warnings.filterwarnings("ignore", category=UserWarning)

# Function to logging (decorator)
def log_function_execution(func):
    def wrapper(*args, **kwargs):
        logging.info(f"'{func.__name__}' - Start function")
        result = func(*args, **kwargs)
        logging.info(f"'{func.__name__}' - Function executed")
        return result
    return wrapper

# Function to fix exceptions (decorator)
def exception(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except FileNotFoundError as e:
            logger.error(f"File not found error in {func.__name__}: {e}")
        except PermissionError as e:
            logger.warning(f"Permission error in {func.__name__}: {e}")
        except (IOError, shutil.Error) as e:
            logger.error(f"Error while moving file {func.__name__}: {e}")
        except ValueError as e:
            logger.error(f"Value error in {func.__name__}: {e}")
        except pd.errors.ParserError as e:
            logger.error(f"Parser error in {func.__name__}: {e}")
        except OSError as e:
            logger.error(f"OS error in {func.__name__}: {e}")
        except BaseSSHTunnelForwarderError as e:
            logger.error(f"SSH tunnel error in {func.__name__}: {e}")
        except SQLAlchemyError as e:
            logger.error(f"SQLAlchemy error in {func.__name__}: {e}")
        except Exception as e:
            logger.error(f"Unexpected error in {func.__name__}: {e}")
        return None
    return wrapper

# Function to read Excel files
@exception
@log_function_execution
def read_excel_files(FOLDER_PATH_IN, FOLDER_PATH_OUT, SHEET, SKIP, COL_NAMES):
    if not os.path.exists(FOLDER_PATH_IN):
        logger.error(f"Input folder '{FOLDER_PATH_IN}' does not exist.")
        return None

    file_list = os.listdir(FOLDER_PATH_IN)
    if not file_list:
        logger.info("No Excel files found in the input folder.")
        return None
    
    file_list = os.listdir(FOLDER_PATH_IN)
    dfs = []
    for file in file_list:
        file_path = os.path.join(FOLDER_PATH_IN, file)
        with pd.ExcelFile(file_path) as xls:
            data = pd.read_excel(xls, sheet_name=SHEET, skiprows=SKIP, names=COL_NAMES)
            dfs.append(data)
        # Moving the file after processing
        move_processed_file(file_path, FOLDER_PATH_OUT, file)
    
    # Check if the list is not empty    
    if dfs:
        df = pd.concat(dfs, ignore_index=True)
        return df
    else:
        logger.info("No data read from the files.")
        return None
    
# Function to move file to archive folder
@exception
@log_function_execution
def move_processed_file(file_path, FOLDER_PATH_OUT, file):
    if not os.path.exists(FOLDER_PATH_OUT):
        os.makedirs(FOLDER_PATH_OUT)

    new_path = os.path.join(FOLDER_PATH_OUT, file)
    if os.path.exists(new_path):
        os.remove(new_path)

    shutil.move(file_path, new_path)

# Function to create dict data
@exception
@log_function_execution
def load_excel_sheets(DICT_PATH, LIST_OF_SHEETS):
    if not os.path.exists(DICT_PATH):
        logger.error(f"The file '{DICT_PATH}' does not exist.")
        return {}

    sheets_data = {}
    for sheet in LIST_OF_SHEETS:
        sheets_data[sheet] = pd.read_excel(DICT_PATH, sheet_name=sheet)
    return sheets_data

# Function to process data
@exception
@log_function_execution
def process_data(df, COMPANIES):
    if df is None or df.empty:
        return df

    if df['Day'].dtype == 'O':
        df['Day'] = df['Day'].str[-10:].str.replace(',', '').str.replace(' ', '')

    df["Day"] = pd.to_datetime(df["Day"]).dt.date
    df = df.loc[df['Company'].isin(COMPANIES)]
    df.columns = df.columns.str.lower().str.replace(' ', '_')
    return df

# Function to filtering unique dates in a dataframe
@exception
@log_function_execution
def create_outer_df(df):
    unique_combinations = df['day'].unique()
    outer_df = pd.DataFrame(unique_combinations, columns=['key'])
    return outer_df

# Function for creating an SSH tunnel
@exception
@log_function_execution
def create_ssh_tunnel():
    ssh_tunnel = SSHTunnelForwarder(**SSH_TUNNEL_PARAMS)
    return ssh_tunnel

# Function to connecting to a database
@exception
@log_function_execution
def create_db_engine(ssh_tunnel):
    if ssh_tunnel is None:
        logger.error("SSH tunnel is not established.")
        return None

    DB_PARAMS['port'] = ssh_tunnel.local_bind_port
    engine_str = f"postgresql://{DB_PARAMS['user']}:{DB_PARAMS['password']}@{DB_PARAMS['host']}:{DB_PARAMS['port']}/{DB_PARAMS['database']}"
    engine = create_engine(engine_str)
    return engine

# Function to get date intersections
@exception
@log_function_execution
def get_intersections(engine, df):
    if df is None or df.empty:
        return []

    query = text('select DISTINCT day as key from sales')
    inner_df = pd.read_sql(query, engine)['key']
    inner_df = df['day'].unique()
    intersection_df = pd.merge(create_outer_df(df), pd.DataFrame({'key': inner_df}), on='key', how='inner')['key'].tolist()
    return intersection_df
    
# Function to remove intersections from the database
@exception
@log_function_execution
def delete_intersections(session, intersection_df, table_name):
    if not intersection_df:
        logger.info("No intersections to delete.")
        return

    delete_query = text(f'DELETE FROM {table_name} WHERE day = ANY(:keys)')
    session.execute(delete_query, {'keys': intersection_df})
    session.commit()

# Function to load data to database
@exception
@log_function_execution
def load_data_to_db(df, engine, name, IF_EXISTS):
    with engine.connect() as conn:
        df.to_sql(name, conn, if_exists=IF_EXISTS, index=False)

# Function to transform and load dict data to database  
@exception 
@log_function_execution
def transform_and_load_dict(engine, dfs):
    for df_name, df in dfs.items():
        df.columns = df.columns.str.lower()
        df.to_sql(df_name.lower(), engine, if_exists="replace", index=False)

# Main function
@log_function_execution
def main():
        
    # Create SSH tunnel
    with create_ssh_tunnel() as ssh_tunnel:
        
        # Create database engine
        engine = create_db_engine(ssh_tunnel)
        
        # Create session
        Session = sessionmaker(bind=engine)
    
        with Session() as session:
            # Iterate over dictionary items
            for table_name, table_info in DATA.items():  
                logging.info(f"Processing table: {table_name}")
            
                # Read Excel files
                df = read_excel_files(
                                    table_info["FOLDER_PATH_IN"], 
                                    table_info["FOLDER_PATH_OUT"], 
                                    table_info["SHEET"], 
                                    table_info["SKIP"], 
                                    table_info["COL_NAMES"]
                                    )

                # Process data
                df = process_data(df, table_info["COMPANIES"])
                                
                # Create intersections
                intersection_df = get_intersections(engine, df)
                
                # Remove intersections from the database
                delete_intersections(session, intersection_df, table_name)
                
                # Load data to database
                load_data_to_db(df, engine, table_name, table_info["IF_EXISTS"])   
                
                # Create Dict 
                dfs = load_excel_sheets(DICT_PATH, LIST_OF_SHEETS)
                
                #transform and load dict data to database
                transform_and_load_dict(engine, dfs)

if __name__ == '__main__':
    main()

2024-05-24 15:52:01,284 - root - INFO - 'main' - Start function
2024-05-24 15:52:01,286 - root - INFO - 'create_ssh_tunnel' - Start function
2024-05-24 15:52:01,290 - root - INFO - 'create_ssh_tunnel' - Function executed
2024-05-24 15:52:01,513 - paramiko.transport - INFO - Connected (version 2.0, client OpenSSH_8.9p1)
2024-05-24 15:52:02,364 - paramiko.transport - INFO - Authentication (password) successful!
2024-05-24 15:52:02,367 - root - INFO - 'create_db_engine' - Start function
2024-05-24 15:52:02,369 - root - INFO - 'create_db_engine' - Function executed
2024-05-24 15:52:02,371 - root - INFO - Processing table: sales
2024-05-24 15:52:02,372 - root - INFO - 'read_excel_files' - Start function
2024-05-24 15:52:02,374 - __main__ - INFO - No Excel files found in the input folder.
2024-05-24 15:52:02,375 - root - INFO - 'read_excel_files' - Function executed
2024-05-24 15:52:02,376 - root - INFO - 'process_data' - Start function
2024-05-24 15:52:02,377 - root - INFO - 'process_data' 