In [1]:
# Let us import the required libraries
import os
import time
import pandas as pd
import logging
from dotenv import dotenv_values
from psycopg2 import DatabaseError, InterfaceError, OperationalError
from sqlalchemy import create_engine, text

# Load the configuration from .env into context
config = dotenv_values(".env")

# DB Variables
pg_host = config['POSTGRES_HOST']
pg_port = config['POSTGRES_PORT']
pg_db = config['POSTGRES_DB']
pg_schema = config['POSTGRES_SCHEMA']
pg_user = config['POSTGRES_USER']
pg_password = config['POSTGRES_PASS']

In [2]:
# Data Source Files Path    
files_path_digi_sa = config['DATA_FILES_PATH_DIGI_SA']

# DB Connection String
DATABASE_URL = f"postgresql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}"

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

pd.set_option('display.max_rows', None)  # Display all rows
pd.set_option('display.max_columns', None)

# Data Cleaning Variables
check_duplicates_data = ['country_name', 'country_code', 'indicator_code']
check_duplicates_countries = ['country_code', 'region']
check_duplicates_indicators = ['indicator_code']

unused_columns = ['scale_precision', 'table_name', 'long_defintion', 'topic', 'periodicity', 'aggregation_method', 'general_comments']

column_data_types = {
    'country_name': 'str', 'country_code': 'str', 'indicator_name': 'str', 'indicator_code': 'str', 'year': 'int', 'indicator_value': 'float',
    'region': 'str', 'income_group': 'str', 'special_notes': 'str', 'table_name': 'str',
    'source_note': 'str', 'source_organization': 'str'
}

sheet_dict = {}

In [3]:
# Creating Class to encapsulate
class DataFrameOperations:
    def convert_columns_lower(self, sheet_df):
        logging.info("Converting Columns to Lowercase")
        sheet_df.columns = sheet_df.columns.str.replace(' ', '_').str.replace(r'(?<!^)(?=[A-Z])', '_').str.lower()
        return sheet_df    

    def remove_duplicates(self, sheet_df, columns):
        logging.info(f"Duplicates before removal: {sheet_df.duplicated(subset=columns, keep='first').sum()}")	
        sheet_df.drop_duplicates(subset=columns, keep='first', inplace=True)
        return sheet_df
    
    def remove_unused_columns(self, sheet_df, columns):
        logging.info(f"Removing Columns: {columns}")
        sheet_df.drop(columns, axis=1, inplace=True, errors='ignore')
        return sheet_df
    
    def melt_data(self, sheet_df, id_vars, var_name, value_name):
        logging.info("Melting Data")
        sheet_df = pd.melt(sheet_df, id_vars=id_vars, var_name=var_name, value_name=value_name)
        logging.info(f"Columns after Melting Data: {sheet_df.columns}")
        return sheet_df    

    def replace_colvalues(self, sheet_df, columns):
        logging.info(f"Before Replacing Column Values: {sheet_df[columns].head(1)}")
        sheet_df[columns] = sheet_df[columns].str.replace(r"(\d{4})_\[yr\d{4}\]", r"\1", regex=True)
        logging.info(f"After Replacing Column Values: {sheet_df[columns].head(1)}")
        return sheet_df

    def interpolate_na_rows(self, sheet_df, columns):
        logging.info(f"Interpolating N/A Columns: {columns}")
        sheet_df[columns].interpolate(method='polynomial', order=2, inplace=True)
        return sheet_df
    
    def standardize_data_types(self, sheet_df, columns):
        logging.info("Standardizing Data Types")
        for column, dtype in columns.items():
            if column in sheet_df.columns:
                sheet_df[column] = sheet_df[column].astype(dtype)
        return sheet_df  

    def convert_datetime(self, sheet_df, columns):
        logging.info(f"Converting Datetime Columns: {columns}")
        sheet_df[columns] = pd.to_datetime(sheet_df[columns], dayfirst=True)
        return sheet_df
    
    def calculate_lead_times(self, sheet_df, col_A, col_B):
        return (sheet_df[col_B] - sheet_df[col_A]).dt.days

    def perform_db_ops(self, DATABASE_URL, sheet_dict):
        logging.info(f"Writing data to DB {sheet_dict.keys()}")
        engine = create_engine(DATABASE_URL) #, connect_args={"connect_timeout": 100})
        try:
            for table_name, database_df in sheet_dict.items():
                table_name = f"{table_name}_digi_sa"
                logging.info(f"Table Name: {table_name}")
                start_time = time.time()
                logging.info(f"Start Time: {start_time}")
                logging.info(f"Engine: {engine}")
                logging.info(f"Schema: {pg_schema}")
                database_df.to_sql(table_name, engine, schema=pg_schema, if_exists='replace', index=False) #, chunksize=1000, method='multi')
                end_time = time.time()
                logging.info(f"DB write duration: {end_time - start_time:.2f} seconds")
                logging.info("Data successfully imported to the PostgreSQL database.")
                with engine.connect() as connection:
                    result = connection.execute(text(f"SELECT count(*) FROM {pg_schema}.{table_name};"))
                    logging.info(f"Count of Rows in table {table_name}: {result.scalar()}")
        except (OperationalError, InterfaceError, DatabaseError) as db_err:
            logging.error(f"Database error occurred: {db_err}")
        except Exception as e:
            logging.error(f"An unexpected error occurred: {e}")
        finally: 
            engine.dispose()
            logging.info("Database connection closed.")

In [4]:
# Creating an instance of the class
df_ops = DataFrameOperations()

# Loop over each sheet
for filename in os.listdir(files_path_digi_sa):
    logging.info(f"Processing file: {filename}")
    # Construct the full file path
    file_path = os.path.join(files_path_digi_sa, filename)
    
    try:
        sheets_dict = pd.read_excel(file_path, sheet_name=None, engine="xlrd")
        logging.info(f"Sheet Names: {sheets_dict.keys()}")
    except Exception as e:
        logging.error(f"Error reading the file: {e}")
        continue
    
    for sheet_name, sheet_df in sheets_dict.items():
        logging.info(f"Processing Sheet: {sheet_name}")
        
        # Common cleansing for all sheets
        sheet_df = df_ops.convert_columns_lower(sheet_df)
        sheet_df = df_ops.remove_unused_columns(sheet_df, unused_columns) 
        # Sheet specific cleansing
        if sheet_name in ['Data', 'Previous_Data']:
            sheet_df = df_ops.remove_duplicates(sheet_df, check_duplicates_data)            
            sheet_df = df_ops.melt_data(sheet_df, ['country_name', 'country_code', 'indicator_name', 'indicator_code'], 'year', 'indicator_value')
            sheet_df = df_ops.replace_colvalues(sheet_df, 'year')
            sheet_df = df_ops.standardize_data_types(sheet_df, column_data_types)
            sheet_df = df_ops.interpolate_na_rows(sheet_df, 'indicator_value')          
            sheet_dict[sheet_name.lower()] = sheet_df             
            
        elif sheet_name == 'Metadata - Countries':
            sheet_df = df_ops.remove_duplicates(sheet_df, check_duplicates_countries)
            sheet_df = df_ops.standardize_data_types(sheet_df, column_data_types)
            sheet_dict[sheet_name.split('-')[1].lower().strip()] = sheet_df           

        elif sheet_name == 'Metadata - Indicators':
            sheet_df = df_ops.remove_duplicates(sheet_df, check_duplicates_indicators)
            sheet_df = df_ops.standardize_data_types(sheet_df, column_data_types)
            sheet_dict[sheet_name.split('-')[1].lower().strip()] = sheet_df
      
        else:
            logging.warning(f"Sheet {sheet_name} not found")

       

2024-10-29 15:41:06,648 - INFO - Processing file: P_Data_Extract_From_Africa_Development_Indicators.xls
2024-10-29 15:41:10,981 - INFO - Sheet Names: dict_keys(['Data', 'Metadata - Countries', 'Metadata - Indicators', 'Previous_Data'])
2024-10-29 15:41:10,982 - INFO - Processing Sheet: Data
2024-10-29 15:41:10,983 - INFO - Converting Columns to Lowercase
2024-10-29 15:41:10,990 - INFO - Removing Columns: ['scale_precision', 'table_name', 'long_defintion', 'topic', 'periodicity', 'aggregation_method', 'general_comments']
2024-10-29 15:41:11,023 - INFO - Duplicates before removal: 0
2024-10-29 15:41:11,032 - INFO - Melting Data
2024-10-29 15:41:11,140 - INFO - Columns after Melting Data: Index(['country_name', 'country_code', 'indicator_name', 'indicator_code',
       'year', 'indicator_value'],
      dtype='object')
2024-10-29 15:41:11,152 - INFO - Before Replacing Column Values: 0    1990_[yr1990]
Name: year, dtype: object
2024-10-29 15:41:11,192 - INFO - After Replacing Column Values:

In [5]:
df_ops.perform_db_ops(DATABASE_URL, sheet_dict)
logging.info(f"Data Processing Completed for file: {filename}") 

2024-10-29 15:41:14,092 - INFO - Writing data to DB dict_keys(['data', 'countries', 'indicators', 'previous_data'])
2024-10-29 15:41:14,460 - INFO - Table Name: data_digi_sa
2024-10-29 15:41:14,466 - INFO - Start Time: 1730212874.466476
2024-10-29 15:41:14,468 - INFO - Engine: Engine(postgresql://shaunkutsanzira:***@data-analytics-course-2.c8g8r1deus2v.eu-central-1.rds.amazonaws.com:5432/hh_analytics_24_2)
2024-10-29 15:41:14,475 - INFO - Schema: capstone_digital_transformation_impact


: 

In [None]:
os.listdir(files_path_digi_sa)

In [None]:
sheet_dict.keys()

In [None]:
DATABASE_URL

In [None]:
engine