IMPORT AND FUNCTIONS

In [69]:
import os
import configparser
import logging
import pandas as pd
import oracledb

def setup_logging(config):
    # Retrieve logging configuration from config
    log_file = config.get('logging', 'log_file')
    log_level = config.get('logging', 'log_level')

    # Create directory for log file if it does not exist
    log_dir = os.path.dirname(log_file)
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    # Configure logging settings
    logging.basicConfig(
        filename=log_file,
        level=getattr(logging, log_level.upper()),
        format='%(asctime)s:%(levelname)s:%(message)s'
    )

    # Set up console logging
    console = logging.StreamHandler()
    console.setLevel(logging.ERROR)
    formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
    console.setFormatter(formatter)
    logging.getLogger('').addHandler(console)

def fetch_data(user, password, dsn, table_name):
    try:
        # Construct the SQL query
        query = f"SELECT * FROM {user}.{table_name}"
        print(query)  # Debug print statements
        print(user)
        print(password)
        print(dsn)

        # Connect to the Oracle database
        connection = oracledb.connect(user=user, password=password, dsn=dsn)
        cursor = connection.cursor()

        # Execute the query and fetch data
        cursor.execute(query)
        columns = [col[0] for col in cursor.description]
        data = cursor.fetchall()

        # Close the cursor and connection
        cursor.close()
        connection.close()

        # Convert data to a pandas DataFrame
        df = pd.DataFrame(data, columns=columns)
        return df
    except Exception as e:
        # Log any exceptions during data fetch
        logging.error(f"Error fetching data: {e}")
        raise e

def upsert_to_customers(df, target_user, target_password, target_dsn, key_columns, source_suffix, target_suffix, table_name):
    try:
        source_len = len(source_suffix)
        target_len = len(target_suffix)

        # Connect to the target Oracle database
        connection = oracledb.connect(user=target_user, password=target_password, dsn=target_dsn)
        cursor = connection.cursor()

        # Iterate over DataFrame rows to perform upsert operations
        for _, row in df.iterrows():
            # Determine if the key columns indicate existing records in target
            if not all([row[f"{key}{target_suffix}_exists"] for key in key_columns]):
                # Insert new record if it does not exist
                insert_columns = key_columns[:]
                insert_values = [row[key] for key in key_columns]
                placeholders = [f':{key}' for key in key_columns]

                # Add non-key columns to the insertion list
                for column in df.columns:
                    if column.endswith(source_suffix) and column[:-source_len] not in key_columns:
                        base_column = column[:-source_len]
                        insert_columns.append(base_column)
                        insert_values.append(row[column])
                        placeholders.append(f":{base_column}")

                # Construct and execute the insert statement
                insert_query = f"""
                    INSERT INTO {table_name} ({', '.join(insert_columns)})
                    VALUES ({', '.join(placeholders)})
                """
                insert_bind_dict = {base_column: value for base_column, value in zip(insert_columns, insert_values)}
                cursor.execute(insert_query, insert_bind_dict)
            else:
                # Update existing record if it exists
                update_columns = []
                bind_dict = {key: row[key] for key in key_columns}

                # Compare each column to check for updates
                for column in df.columns:
                    if column.endswith(source_suffix):
                        base_column = column[:-source_len]
                        target_column = base_column + target_suffix
                        if target_column in df.columns:
                            if row[column] != row[target_column]:
                                update_columns.append(f"{base_column} = :{base_column}")
                                bind_dict[base_column] = row[column]

                # Construct and execute the update statement
                if update_columns:
                    key_condition = ' AND '.join([f"{key} = :{key}" for key in key_columns])
                    update_query = f"UPDATE {table_name} SET {', '.join(update_columns)} WHERE {key_condition}"
                    cursor.execute(update_query, bind_dict)

        # Commit the transaction and close the connection
        connection.commit()
        connection.close()
    except Exception as e:
        # Log  any exceptions that occur during the upsert operation
        logging.error(f"Error during upsert operation: {e}")
        raise e


MAIN 

In [70]:
def main(env='production'):
    try:
        # Configuration Variables
        #Specifies key columns for merging, table name for upsert operations, and suffixes for source and target data columns.
        
        key_columns = ["CUSTOMER_ID"]  # List of key columns for merging
        table_name = "CUSTOMERS"  # Table name for upsert operations
        source_suffix = "_etl"  # Suffix for source data columns
        target_suffix = "_app"  # Suffix for target data columns
        
        # Load configuration from config file
        # Loads the configuration file which contains database connection details and logging settings.
        config = configparser.ConfigParser()
        config.read('config/config.ini')

        # Extract source and target sections from the configuration file
        # Determines the configuration sections to use based on the environment (e.g., production).
        source_section = f'{env}_etl'
        target_section = f'{env}_app'

        # Get database connection details # Extracts the details for connecting to the source and target databases.
        source_user = config[source_section]['username']
        source_password = config[source_section]['password']
        source_dsn = config[source_section]['dsn']
        target_user = config[target_section]['username']
        target_password = config[target_section]['password']
        target_dsn = config[target_section]['dsn']

        # Set up logging
        setup_logging(config)
        logging.info(f'Starting the database operations script in {env} environment.')

        # Fetch data from the source and target databases
        source_data = fetch_data(source_user, source_password, source_dsn, "S_" + table_name)  # Added prefix S_ for staging table
        target_data = fetch_data(target_user, target_password, target_dsn, table_name)

        # Merge source and target data on key columns with indicator column to identify matched and unmatched records.
        df_merged = source_data.merge(target_data, on=key_columns, how="left", suffixes=(source_suffix, target_suffix), indicator=True)

        # Add columns to detect if each key exists in both dataframes
        for key in key_columns:
            df_merged[key + target_suffix + '_exists'] = df_merged['_merge'] == 'both'

        # Debug print to check merged dataframe
        print(df_merged)
        
        # Perform the upsert operation (insert/update logic)
        upsert_to_customers(df_merged, target_user, target_password, target_dsn, key_columns, source_suffix, target_suffix, table_name)

        logging.info(f'Data synchronization between S_{table_name} and {table_name} completed.')
    except Exception as e:
        # Log any exceptions that occur during the main function execution
        logging.error(f"Error in main function: {e}")
        raise e

# Entry point for the script
if __name__ == "__main__":
    main()


SELECT * FROM etl.S_CUSTOMERS
etl
etl
localhost:1521/XEPDB1
SELECT * FROM app.CUSTOMERS
app
app
localhost:1521/XEPDB1
   CUSTOMER_ID EMAIL_ADDRESS_etl FULL_NAME_etl  REC_STS           REC_TMSTP  \
0          392             1qdqw         2ewew        1 2024-09-01 16:03:32   
1         1666             3dqqw        dsdsds        1 2024-08-30 17:18:39   

  EMAIL_ADDRESS_app FULL_NAME_app _merge  CUSTOMER_ID_app_exists  
0             1qdqw         2ewew   both                    True  
1             3dqqw        dsdsds   both                    True  
