In [1]:
import logging
import time
from AGGjobs import *
from config import Config  # Assuming Config class is defined in config.py
# from XML_parse import XMLParser  # Importing the XMLParser class
from database import Database  # Assuming Database class is defined in database.py

# Configure logging
logging.basicConfig(
    filename='database_operations.log',
    level=logging.DEBUG,  # Changed to DEBUG to capture all messages
    format='%(asctime)s - %(levelname)s - %(message)s',
    filemode='w'  # Ensure the file is overwritten each time for clean logs
)
def log_execution_time(job_name, start_time):
    end_time = time.time()
    execution_time = end_time - start_time
    logging.info(f"Execution time for {job_name}: {execution_time:.2f} seconds")

def main():
    config_file = "config.yaml"
    config = Config(config_file)

    # Retrieve JDBC parameters and create a Database instance
    jdbc_params = config.get_jdbc_parameters()

    db = Database(jdbc_params)
    db.set_jdbc_parameters(jdbc_params)
    db.connect_JDBC()

    # Get the execution date
    execution_date_query = config.get_param('queries', 'TRANSVERSE_QUERY_LASTEXECUTIONDATE')
    execution_date = db.get_execution_date(execution_date_query)
    # logging.info(f"Execution Date: {execution_date}")


    # Job execution without a loop
        
    # start_time = time.time()
    # logging.info("Starting AUD_404_AGG_TAGGREGATE...")

    # AUD_404_AGG_TAGGREGATE(config, db,  execution_date)
    # log_execution_time("AUD_404_AGG_TAGGREGATE", start_time)

    start_time = time.time()
    logging.info("Starting AUD_405_AGG_TMAP...")
    AUD_405_AGG_TMAP(config, db,  execution_date)
    log_execution_time("AUD_405_AGG_TMAP", start_time)
    # Job execution without a loop
    # start_time = time.time()
    # logging.info("Starting AUD_405_AGG_TXMLMAP...")
    # AUD_405_AGG_TXMLMAP(config, db,  execution_date)
    # log_execution_time("AUD_405_AGG_TXMLMAP", start_time)

if __name__ == "__main__":
    main()


JDBC Driver: com.mysql.cj.jdbc.Driver
JDBC URL: jdbc:mysql://localhost:3306/sqops_dataraise?allowLoadLocalInfile=true&characterEncoding=utf8
JDBC User: root
JDBC JAR: C:/Users/sonia/Downloads/mysql-connector-j-9.0.0/mysql-connector-j-9.0.0/mysql-connector-j-9.0.0.jar


In [None]:
import pandas as pd

# Sample input DataFrame (input_df)
input_data = {
    'rowName': ['access', 'access', 'access', 'access', 'access'],
    'nameColumnInput': ['abris', 'accessibilite', 'annee_pose', 'arret', 'arret_pp'],
    'composant': ['tMap_2', 'tMap_2', 'tMap_2', 'tMap_2', 'tMap_2'],
    'NameJob': ['KEO_ETL_MOBIREPORT_LoadBDDAccess']*5,
    'NameProject': ['KEOLISTOURS']*5
}

input_df = pd.DataFrame(input_data)

# Simulated DataFrames for joins
aud_agg_tmapinputinoutput_data = {
    'rowName': ['access', 'access', 'access', 'access'],
    'NameRowInput': ['abris', 'accessibilite', 'annee_pose', 'arret'],
    'composant': ['tMap_2']*4,
    'NameJob': ['KEO_ETL_MOBIREPORT_LoadBDDAccess']*4,
    'NameProject': ['KEOLISTOURS']*4
}
aud_agg_tmapinputinoutput_df = pd.DataFrame(aud_agg_tmapinputinoutput_data)

aud_agg_tmapinputinfilteroutput_data = {
    'rowName': ['access', 'access', 'access'],
    'NameRowInput': ['abris', 'accessibilite', 'annee_pose'],
    'composant': ['tMap_2']*3,
    'NameProject': ['KEOLISTOURS']*3,
    'NameJob': ['KEO_ETL_MOBIREPORT_LoadBDDAccess']*3
}
aud_agg_tmapinputinfilteroutput_df = pd.DataFrame(aud_agg_tmapinputinfilteroutput_data)

# Function for the join operation
def catch_inner_join_rejects(left_df, right_df, left_join_keys, right_join_keys):
    
    logging.info("Starting inner join rejection detection.")
    logging.debug(f"Left DataFrame shape: {left_df.shape}")
    logging.debug(f"Right DataFrame shape: {right_df.shape}")

    # Perform a left join to detect left rejects
    logging.info("Performing left join to detect rows rejected from the left DataFrame.")
    left_join_df = pd.merge(
        left_df,
        right_df,
        left_on=left_join_keys,
        right_on=right_join_keys,
        how='left',
        indicator=True  # Adds a '_merge' column to indicate join status
    )
    left_rejects = left_join_df[left_join_df['_merge'] == 'left_only'].drop(columns=['_merge'])
    logging.info(f"Detected {len(left_rejects)} left rejects.")
    logging.debug(f"Left rejects sample:\n{left_rejects.head()}")

    # Perform a right join to detect right rejects
    logging.info("Performing right join to detect rows rejected from the right DataFrame.")
    right_join_df = pd.merge(
        left_df,
        right_df,
        left_on=left_join_keys,
        right_on=right_join_keys,
        how='right',
        indicator=True  # Adds a '_merge' column to indicate join status
    )
    right_rejects = right_join_df[right_join_df['_merge'] == 'right_only'].drop(columns=['_merge'])
    logging.info(f"Detected {len(right_rejects)} right rejects.")
    logging.debug(f"Right rejects sample:\n{right_rejects.head()}")

    # Combine left and right rejects
    logging.info("Combining left and right rejects for overall analysis.")
    all_rejects = pd.concat([left_rejects, right_rejects], ignore_index=True)

    # Optionally, remove duplicates if you expect overlaps
    all_rejects = all_rejects.drop_duplicates()

    logging.debug(f"Total rejects detected: {len(all_rejects)}.")
    logging.debug(f"Combined rejects sample:\n{all_rejects.head()}")

    logging.info("Finished inner join rejection detection.")
    return all_rejects

# Step 3: Testing the function
# First, rename 'nameColumnInput' to 'NameRowInput' in input_df for consistency
input_df = input_df.rename(columns={"nameColumnInput": "NameRowInput"})

# Test the join function sequentially with multiple DataFrames
df = catch_inner_join_rejects(input_df, aud_agg_tmapinputinoutput_df, 
                               ['rowName', 'NameRowInput', 'composant', 'NameJob', 'NameProject'],
                               ['rowName', 'NameRowInput', 'composant', 'NameJob', 'NameProject'])

df1 = catch_inner_join_rejects(df, aud_agg_tmapinputinfilteroutput_df,
                                ['rowName', 'NameRowInput', 'composant', 'NameProject', 'NameJob'],
                                ['rowName', 'NameRowInput', 'composant', 'NameProject', 'NameJob'])

# Output the result of the final rejection data
print("Final rejected rows:")
print(df1)


IndentationError: expected an indented block after function definition on line 34 (2979018070.py, line 35)