In [None]:
import os
import pandas as pd
from sqlalchemy import create_engine
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def extract_date(filename):
    """
    Extracts date from the filename assuming format 'FILENAME_YYYYMMDD.csv'.
    """
    return filename.split('_')[-1].split('.')[0]

data_lake_container = 'path_to_your_data_lake_container'

db_user = 'your_database_username'
db_password = 'your_database_password'
db_name = 'your_database_name'
db_host = 'your_database_host' 
db_port = 'your_database_port' 
engine = create_engine(f'mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')


In [None]:
try:
    files = os.listdir(data_lake_container)
    logging.info(f'Found {len(files)} files in {data_lake_container}')
except FileNotFoundError:
    logging.error(f'Directory {data_lake_container} not found.')
    raise

cust_mstr_dataframes = []
master_child_dataframes = []
h_ecom_order_dataframes = []

for file in files:
    try:
        file_path = os.path.join(data_lake_container, file)
        df = pd.read_csv(file_path)
        logging.info(f'Reading data from {file_path}')

        if file.startswith('CUST_MSTR'):
            df['Date'] = df['FileName'].apply(extract_date)
            cust_mstr_dataframes.append(df)
            logging.info(f'Processed CUST_MSTR file: {file}')
        
        elif file.startswith('master_child_export'):
            df['Date'] = df['FileName'].apply(lambda x: pd.to_datetime(extract_date(x), format='%Y%m%d').strftime('%Y-%m-%d'))
            df['DateKey'] = df['FileName'].apply(extract_date)
            master_child_dataframes.append(df)
            logging.info(f'Processed master_child_export file: {file}')
        
        elif file.startswith('H_ECOM_ORDER'):
            h_ecom_order_dataframes.append(df)
            logging.info(f'Processed H_ECOM_ORDER file: {file}')
        
        else:
            logging.warning(f'Skipped file: {file}. Unknown file type.')
    
    except Exception as e:
        logging.error(f'Error processing file {file}: {str(e)}')

print('Data processing completed.')


In [None]:
def load_dataframe_to_db(df, table_name):
    try:
        df.to_sql(table_name, con=engine, if_exists='replace', index=False)
        logging.info(f'Data loaded into {table_name} table successfully.')
    except Exception as e:
        logging.error(f'Error loading data into {table_name} table: {str(e)}')

try:
    for df in cust_mstr_dataframes:
        load_dataframe_to_db(df, 'CUST_MSTR')

    for df in master_child_dataframes:
        load_dataframe_to_db(df, 'master_child')

    for df in h_ecom_order_dataframes:
        load_dataframe_to_db(df, 'H_ECOM_Orders')

    logging.info('All data loaded into database successfully.')

except Exception as e:
    logging.error(f'Error loading data into database: {str(e)}')
