In [27]:
import datetime
import logging
import os
import time
import json
import boto3
##################################
import dotenv
dotenv.load_dotenv()
boto3.setup_default_session(profile_name='prd-valorx-admin', region_name='us-east-1')
##################################
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def find_key(data, key_to_find):
    """
    Busca recursivamente una clave en un diccionario anidado y devuelve la clave y su valor.
    
    Args:
        data: El diccionario en el que buscar
        key_to_find: La clave que se busca
        
    Returns:
        Un diccionario con la clave encontrada y su valor, o None si no se encuentra
    """
    # Caso base: si data es un diccionario
    if isinstance(data, dict):
        # Comprueba si la clave está en este nivel
        if key_to_find in data:
            return {key_to_find: data[key_to_find]}
        
        # Busca en los valores del diccionario
        for key, value in data.items():
            result = find_key(value, key_to_find)
            if result:
                return result
    
    # Si data es una lista, busca en cada elemento
    elif isinstance(data, list):
        for item in data:
            result = find_key(item, key_to_find)
            if result:
                return result
    
    # No se encontró
    return None
    
def send_success_message(topic_arn, endpoint_name, process_id):
    client = boto3.client("sns")
    logger.info(f"sending succeded message for {endpoint_name} : {process_id}")
    response = client.publish(
        TopicArn=topic_arn,
        Message=f"successfully load tables from process : {process_id} Source : {endpoint_name}"
    )

def lambda_handler(event, context):
    try:
        logger.info(event)
        client = boto3.resource('dynamodb')
        dynamo_table_name = os.getenv('DYNAMO_DB_TABLE')
        topic_arn = os.getenv("TOPIC_ARN")
        config_table_metadata = client.Table(dynamo_table_name)
        
        key = find_key(event, "stage_job_result")
        if key is None:
            logger.error("Key 'stage_job_result' not found in event")
            return {
                'result': "FAILED",
                'endpoint': "",
                'table_names': "",
                'message_error': "Key 'stage_job_result' not found in event"
            }

        table = key['stage_job_result']['Arguments']['--TABLE_NAME']
          
        table_names = ""
        table_data = config_table_metadata.get_item(Key={'TARGET_TABLE_NAME': table})['Item']
        endpoint = table_data['ENDPOINT']
        process_id = table_data['PROCESS_ID']
        
        try:
            raw_failed_tables = config_table_metadata.scan(
                FilterExpression=f"ENDPOINT = :val1 AND ACTIVE_FLAG = :val2 AND STATUS_RAW = :val3 ",
                ExpressionAttributeValues={
                    ':val1': endpoint,
                    ':val2': 'Y',
                    ':val3': 'FAILED'
                }
            )
            logger.info(f"failed tables in raw: {raw_failed_tables}")

            stage_failed_tables = config_table_metadata.scan(
                FilterExpression=f"ENDPOINT = :val1 AND ACTIVE_FLAG = :val2 AND STATUS_STAGE = :val3 ",
                ExpressionAttributeValues={
                    ':val1': endpoint,
                    ':val2': 'Y',
                    ':val3': 'FAILED'
                }
            )
            logger.info(f"failed tables in raw: {stage_failed_tables}")
            #if (not 'Items' in raw_failed_tables.keys() or raw_failed_tables['Items'] == []) and (not 'Items' in stage_failed_tables.keys() or raw_failed_tables['Items'] == []):
            #    send_success_message(topic_arn, endpoint, process_id)

        except Exception as e:
            logger.error(str(e))

        return {
            'result': "SUCCESS",
            'endpoint': endpoint,
            'table_names': table_names,
            'process_id': process_id
        }

    except Exception as e:
        logger.error("Exception: {}".format(e))
        return {
            'result': "FAILED",
            'endpoint': "",
            'table_names': "",
            'message_error': str(e)
        }
    

In [23]:
event = {"result": "SUCCEEDED", "dynamodb_key": [{"table": "PEBDDATA_M_COMPANIA"}], "process": "10", "execute_raw": False, "query_results": {"status": "COMPLETED", "result": [{"raw_job_result": {"JobRunId": "N/A-skipped", "Status": "SKIPPED"}, "process": "10", "dynamodb_key": {"table": "PEBDDATA_M_COMPANIA"}, "execute_raw": False, "stage_job_result": {"AllocatedCapacity": 2, "Arguments": {"--TABLE_NAME": "PEBDDATA_M_COMPANIA"}, "Attempt": 0, "CompletedOn": 1747569665090, "ExecutionTime": 141, "GlueVersion": "4.0", "Id": "jr_bcad3ec1155952416651ba2fb5a623c4777399a7c86a88ad7d0aad25997000d6", "JobMode": "SCRIPT", "JobName": "sofia-dev-datalake-light_transform-job", "JobRunState": "SUCCEEDED", "LastModifiedOn": 1747569665090, "LogGroupName": "/aws-glue/jobs", "MaxCapacity": 2.0, "NumberOfWorkers": 2, "PredecessorRuns": [], "StartedOn": 1747569511880, "Timeout": 180, "WorkerType": "G.1X"}}]}}

In [None]:
result = find_key(event, "stage_job_result")

In [18]:
result

In [13]:
event['query_results']['result'][0]['stage_job_result']


{'AllocatedCapacity': 2,
 'Arguments': {'--TABLE_NAME': 'PEBDDATA_M_COMPANIA'},
 'Attempt': 0,
 'CompletedOn': 1747569665090,
 'ExecutionTime': 141,
 'GlueVersion': '4.0',
 'Id': 'jr_bcad3ec1155952416651ba2fb5a623c4777399a7c86a88ad7d0aad25997000d6',
 'JobMode': 'SCRIPT',
 'JobName': 'sofia-dev-datalake-light_transform-job',
 'JobRunState': 'SUCCEEDED',
 'LastModifiedOn': 1747569665090,
 'LogGroupName': '/aws-glue/jobs',
 'MaxCapacity': 2.0,
 'NumberOfWorkers': 2,
 'PredecessorRuns': [],
 'StartedOn': 1747569511880,
 'Timeout': 180,
 'WorkerType': 'G.1X'}

In [28]:
event = {"result": "SUCCEEDED", "dynamodb_key": [{"table": "PEBDDATA_M_COMPANIA"}], "process": "10", "execute_raw": False, "query_results": {"status": "COMPLETED", "result": [{"raw_job_result": {"JobRunId": "N/A-skipped", "Status": "SKIPPED"}, "process": "10", "dynamodb_key": {"table": "PEBDDATA_M_COMPANIA"}, "execute_raw": False, "stage_job_result": {"AllocatedCapacity": 2, "Arguments": {"--TABLE_NAME": "PEBDDATA_M_COMPANIA"}, "Attempt": 0, "CompletedOn": 1747569665090, "ExecutionTime": 141, "GlueVersion": "4.0", "Id": "jr_bcad3ec1155952416651ba2fb5a623c4777399a7c86a88ad7d0aad25997000d6", "JobMode": "SCRIPT", "JobName": "sofia-dev-datalake-light_transform-job", "JobRunState": "SUCCEEDED", "LastModifiedOn": 1747569665090, "LogGroupName": "/aws-glue/jobs", "MaxCapacity": 2.0, "NumberOfWorkers": 2, "PredecessorRuns": [], "StartedOn": 1747569511880, "Timeout": 180, "WorkerType": "G.1X"}}]}}
lambda_handler(event, None)

{'result': 'SUCCESS',
 'endpoint': 'PEBDDATA',
 'table_names': '',
 'process_id': '10'}

In [20]:
import json

json.dumps(event)

'{"result": "SUCCEEDED", "dynamodb_key": [{"table": "PEBDDATA_M_COMPANIA"}], "process": "10", "execute_raw": false, "query_results": {"status": "COMPLETED", "result": [{"raw_job_result": {"JobRunId": "N/A-skipped", "Status": "SKIPPED"}, "process": "10", "dynamodb_key": {"table": "PEBDDATA_M_COMPANIA"}, "execute_raw": false, "stage_job_result": {"AllocatedCapacity": 2, "Arguments": {"--TABLE_NAME": "PEBDDATA_M_COMPANIA"}, "Attempt": 0, "CompletedOn": 1747569665090, "ExecutionTime": 141, "GlueVersion": "4.0", "Id": "jr_bcad3ec1155952416651ba2fb5a623c4777399a7c86a88ad7d0aad25997000d6", "JobMode": "SCRIPT", "JobName": "sofia-dev-datalake-light_transform-job", "JobRunState": "SUCCEEDED", "LastModifiedOn": 1747569665090, "LogGroupName": "/aws-glue/jobs", "MaxCapacity": 2.0, "NumberOfWorkers": 2, "PredecessorRuns": [], "StartedOn": 1747569511880, "Timeout": 180, "WorkerType": "G.1X"}}]}}'