# AWS Glue ETL - Oracle Extraction Development Notebook

Este notebook permite desarrollar y probar el ETL de Oracle Extraction en un entorno local que replica AWS Glue. Utiliza las mismas bibliotecas, frameworks y patrones que se usarían en AWS Glue en la nube.

## 1. Inicialización del Entorno AWS Glue

Primero, configuramos el entorno de ejecución de AWS Glue con SparkContext, GlueContext y Job:

In [None]:
# Importaciones de AWS Glue
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Importaciones para el ETL
from pyspark.sql.functions import col, when, concat_ws
import boto3
import json
import logging
from io import StringIO

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()

# Inicializar contexto Glue y Spark
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Verificar versiones
print(f"Spark version: {spark.version}")
print(f"Python version: {sys.version}")

## 2. Configuración de Parámetros del Job

Simulamos los parámetros que normalmente se pasarían al job de Glue:

In [None]:
# Definir parámetros de prueba
test_input = [
    {
        "enterprise": "URBANOS DE SEGOVIA",
        "body": json.dumps({
            "P_EMPRESA": "URBANOS DE SEGOVIA",
            "P_CONTR": "001",
            "P_CONTR_FLAG": "False",
            "P_VERSION": "20240101_20241231",
            "P_FECHAD": "20240101",
            "P_FECHAH": "20241231",
            "P_ENV": "dev",
            "P_LINE": "001"
        }),
        "id": "1"
    }
]

# Mock de getResolvedOptions
def mock_getResolvedOptions(argv, options):
    return {
        "JOB_NAME": "oracle-extraction-dev",
        "bronze_bucket": "mado-gtfs-dev-eu-west-1-992807582431-bronze",
        "json_input": json.dumps(test_input)
    }

# Reemplazar getResolvedOptions
import awsglue.utils
awsglue.utils.getResolvedOptions = mock_getResolvedOptions

# Obtener argumentos del trabajo
args = getResolvedOptions(sys.argv, ["JOB_NAME", "bronze_bucket", "json_input"])

# Inicializar el job
job.init(args["JOB_NAME"], args)

# Extraer parámetros
bronze_bucket = args["bronze_bucket"]
json_input = args["json_input"]
explotations = json.loads(json_input)

# Extraer parámetros específicos
p_empresa = json.loads(explotations[0]["body"])["P_EMPRESA"]
p_contr = json.loads(explotations[0]["body"])["P_CONTR"]
p_contr_flag = json.loads(explotations[0]["body"])["P_CONTR_FLAG"]
p_version = json.loads(explotations[0]["body"])["P_VERSION"]
p_fechad = json.loads(explotations[0]["body"])["P_FECHAD"]
p_fechah = json.loads(explotations[0]["body"])["P_FECHAH"]
p_env = json.loads(explotations[0]["body"])["P_ENV"]

# Extraer líneas
p_lines = []
for explotation in explotations:
    p_lines.append(json.loads(explotation["body"])["P_LINE"])
lineas_activas = tuple(p_lines)

# Mostrar parámetros configurados
print(f"Parámetros configurados:")
print(f"- Empresa: {p_empresa}")
print(f"- Contrato: {p_contr}")
print(f"- Versión: {p_version}")
print(f"- Fechas: {p_fechad} - {p_fechah}")
print(f"- Líneas: {lineas_activas}")

## 3. Configuración de Secretos y Conexión a Oracle

Para conectar a Oracle, necesitamos obtener los secretos de AWS o configurar una conexión local:

In [None]:
def get_secret(secret_name, region_name="eu-west-1", use_local=False):
    if use_local:
        mock_secrets = {
            "/gtfs/on-prem/gestra": {
                "username": "gestra_user",
                "password": "gestra_password",
                "host": "oracle-host",
                "port": "1521",
                "service": "XEPDB1"
            }
        }
        return mock_secrets.get(secret_name)
    else:
        try:
            session = boto3.session.Session()
            client = session.client(
                service_name='secretsmanager',
                region_name=region_name
            )
            get_secret_value_response = client.get_secret_value(SecretId=secret_name)
            
            if 'SecretString' in get_secret_value_response:
                secret = get_secret_value_response['SecretString']
                return json.loads(secret)
        except Exception as e:
            print(f"Error obteniendo secreto: {e}")
            return None

try:
    gestra_secret = get_secret("/gtfs/on-prem/gestra")
    if gestra_secret:
        print("✅ Secreto obtenido correctamente de AWS Secrets Manager")
        # Mostrar detalles del secreto (sin la contraseña)
        safe_secret = {k: ("******" if k == "password" else v) for k, v in gestra_secret.items()}
        print(f"Detalles: {safe_secret}")
    else:
        print("⚠️ No se pudo obtener el secreto de AWS, usando configuración local")
        gestra_secret = get_secret("/gtfs/on-prem/gestra", use_local=True)
except Exception as e:
    print(f"⚠️ Error conectando a AWS: {e}")
    print("Usando configuración local para pruebas")
    gestra_secret = get_secret("/gtfs/on-prem/gestra", use_local=True)

### Configuración JDBC para Oracle

In [None]:
# Configurar conexión JDBC para Oracle (si tenemos secretos)
if gestra_secret:
    # Formato 1 (estándar)
    jdbc_url = f"jdbc:oracle:thin:@//{gestra_secret['host']}:{gestra_secret['port']}/{gestra_secret['service']}"
    
    # Formato 2 (alternativo si el primero no funciona)
    # jdbc_url = f"jdbc:oracle:thin:{gestra_secret['username']}/{gestra_secret['password']}@{gestra_secret['host']}:{gestra_secret['port']}:{gestra_secret['service']}"
    
    # Formato 3 (con descripción completa)
    # jdbc_url = f"jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={gestra_secret['host']})(PORT={gestra_secret['port']}))(CONNECT_DATA=(SERVICE_NAME={gestra_secret['service']}))"
    
    connection_properties = {
        "user": gestra_secret['username'],
        "password": gestra_secret['password'],
        "driver": "oracle.jdbc.driver.OracleDriver"
    }
    
    print(f"JDBC URL: {jdbc_url}")
    print(f"Propiedades: {connection_properties['user']}, driver: {connection_properties['driver']}")
else:
    print("⚠️ No se ha configurado la conexión JDBC (secretos no disponibles)")

## 4. Prueba de Conectividad a Oracle

In [None]:
# Probar conexión a Oracle usando una consulta simple
if gestra_secret:
    try:
        # Intentar ejecutar una consulta básica
        test_query = """SELECT 1 as TEST_CONNECTION FROM DUAL"""
        
        test_df = spark.read.format("jdbc").options(
            url=jdbc_url,
            dbtable=f"({test_query})",
            user=connection_properties["user"],
            password=connection_properties["password"],
            driver=connection_properties["driver"]
        ).load()
        
        # Mostrar resultados
        test_df.show()
        print("✅ Conexión a Oracle exitosa")
    except Exception as e:
        print(f"❌ Error al conectar a Oracle: {e}")
        print("\nEsto puede deberse a:")
        print("  - Credenciales incorrectas")
        print("  - Problemas de red/firewall")
        print("  - Formato incorrecto de URL JDBC")
        print("  - Driver Oracle no disponible")
else:
    print("⚠️ No se puede probar la conexión sin configuración")

## 5. Crear Contract Codes

Ahora creamos la tabla `contract_codes` que se usa en el ETL:

In [None]:
# Crear contract codes como lo hace el ETL
contract_codes = []
for line in p_lines:
    contract_codes.append(
        spark.sql(
            f"SELECT '{p_empresa}' AS EMPRESA, '{line}' AS LINE, '{p_contr}' AS CONTRACT_CODE"
        )
    )

# Union all the DataFrames
final_contract_codes_df = contract_codes[0]
for df in contract_codes[1:]:
    final_contract_codes_df = final_contract_codes_df.union(df)

final_contract_codes_df.createOrReplaceTempView("contract_codes")
result = spark.sql("SELECT * FROM contract_codes")
result.show()

## 6. Consulta de Rutas (Routes)

Probemos la consulta routes que utiliza el ETL:

In [None]:
# Definir la consulta de rutas
routes_query = f'''
                    select
                        CAST(TRIM(t.CODLIN) AS STRING) as route_id,
                        CAST('{p_empresa}' AS STRING)||'_'||CAST('{p_contr}' AS STRING) as agency_id,
                        CAST(COALESCE(wb.CODLINWEB,t.CODLIN) AS STRING) as route_short_name,
                        CAST(COALESCE(wb.NOMWEB,t.NOMLIN) AS STRING) as route_long_name,
                        NULL as route_desc,
                        '3'   as route_type,
                        NULL as route_url,
                        COALESCE(wb.COLOR, 'CC0000') as route_color,
                        COALESCE(wb.COLOR, 'FFFFFF') as route_text_color,
                        NULL as route_sort_order,
                        NULL as continuous_pickup,
                        NULL as continuous_drop_off
                    from MAELIN t
                    left join PB_LINWEB wb on (wb.EMPRESA = t.EMPRESA and wb.CODLIN = t.CODLIN) 
                    INNER JOIN ( SELECT EMPRESA, LINE, CONTRACT_CODE 
                        FROM ( contract_codes) 
                    ) lc ON t.empresa = lc.EMPRESA AND t.codlin = lc.LINE
                    where t.EMPRESA = '{p_empresa}' and lc.CONTRACT_CODE = '{p_contr}'
                    and t.ESTADO = 1 
                    and t.ACTWEB = 'S'
                    and t.ACTGTR = 'S'
                    and t.CODLIN in (select distinct codlin from PB_CALENDCMR
                        where empresa = '{p_empresa}' 
                        and TO_DATE(fecha, 'yyyy-MM-dd') BETWEEN TO_DATE('{p_fechad}', 'yyyyMMdd') AND TO_DATE('{p_fechah}', 'yyyyMMdd')
                    )
            '''

print(f"Consulta SQL generada:")
print(routes_query)

In [None]:
# Ejecutar la consulta si tenemos conexión Oracle
if gestra_secret:
    try:
        routes_df = spark.read.format("jdbc").options(
            url=jdbc_url,
            dbtable=f"({routes_query})",
            user=connection_properties["user"],
            password=connection_properties["password"],
            driver=connection_properties["driver"]
        ).load()
        
        # Mostrar esquema y datos
        print("Esquema:")
        routes_df.printSchema()
        
        print("\nDatos:")
        routes_df.show()
    except Exception as e:
        print(f"❌ Error ejecutando consulta: {e}")
else:
    print("⚠️ No se puede ejecutar la consulta sin conexión Oracle")
    # Crear datos de ejemplo para desarrollo
    print("Creando datos de ejemplo para desarrollo...")
    
    routes_test_data = [
        ("001", f"{p_empresa}_{p_contr}", "001", "Línea 1 - Centro", None, "3", None, "CC0000", "FFFFFF", None, None, None),
        ("002", f"{p_empresa}_{p_contr}", "002", "Línea 2 - Estación", None, "3", None, "0000CC", "FFFFFF", None, None, None),
    ]
    
    routes_columns = ["route_id", "agency_id", "route_short_name", "route_long_name", "route_desc", 
                   "route_type", "route_url", "route_color", "route_text_color", 
                   "route_sort_order", "continuous_pickup", "continuous_drop_off"]
    
    routes_df = spark.createDataFrame(routes_test_data, routes_columns)
    routes_df.printSchema()
    routes_df.show()

## 7. Función Process del ETL

Ahora probemos la función principal `process` del ETL:

In [None]:
# Definir la función process como en el ETL original
def process(queries, P_EMPRESA, P_VERSION, P_CONTR, P_FECHAD, P_FECHAH, contract_codes):
    """
    Procesa las tablas: routes, stops, trips, calendar,
    calendar_dates, stop_times, trayectos
    """
    try:
        if "shapes" in queries.items():
            return
        else:
            for file_name, query in queries.items():
                try:
                    contract_codes_sql = spark.sql("SELECT * FROM contract_codes").toPandas()
                    sql_for_copy = "WITH contract_codes AS (\n"
                    for i, row in contract_codes_sql.iterrows():
                        sql_for_copy += f"    SELECT '{row['EMPRESA']}' AS EMPRESA, '{row['LINE']}' AS LINE, '{row['CONTRACT_CODE']}' AS CONTRACT_CODE FROM DUAL"
                        if i < len(contract_codes_sql) - 1:
                            sql_for_copy += " UNION ALL\n"
                        else:
                            sql_for_copy += "\n"
                    sql_for_copy += ")\n" + query.replace("( contract_codes )", "contract_codes")
                    logging.info(f"\n===== SQL DE ({file_name}) =====")
                    logging.info(sql_for_copy)
                except Exception as e:
                    logging.error(f"Error al crear SQL para depuración: {str(e)}")

                file_path = "".join(
                    f"GTFS/{file_name.upper()}"
                    f"/explotation={P_EMPRESA}"
                    f"/contract={P_CONTR}"
                    f"/version={P_VERSION}/"
                )

                logging.info(f"Procesando {file_name}...")
                
                if gestra_secret:
                    # Ejecutar la consulta contra Oracle
                    try:
                        df = spark.read.format("jdbc").options(
                            url=jdbc_url,
                            dbtable=f"({query})",
                            user=connection_properties["user"],
                            password=connection_properties["password"],
                            driver=connection_properties["driver"]
                        ).load()
                        logging.info(f"Consulta ejecutada correctamente para {file_name}")
                    except Exception as e:
                        logging.error(f"Error al ejecutar consulta para {file_name}: {str(e)}")
                        # Crear DataFrame vacío con estructura esperada para continuar
                        if file_name == "routes":
                            columns = ["route_id", "agency_id", "route_short_name", "route_long_name", "route_desc", 
                                      "route_type", "route_url", "route_color", "route_text_color", 
                                      "route_sort_order", "continuous_pickup", "continuous_drop_off"]
                            df = spark.createDataFrame([], schema=columns)
                        else:
                            df = spark.createDataFrame([])
                else:
                    # Modo offline - crear datos de ejemplo
                    logging.info(f"Modo OFFLINE: Generando datos de ejemplo para {file_name}")
                    if file_name == "routes":
                        df = routes_df  # Usar el DataFrame de ejemplo creado antes
                    else:
                        # Crear DataFrame vacío para otros archivos
                        df = spark.createDataFrame([])
                
                # Mostrar información del DataFrame
                logging.info(f"Esquema para {file_name}:")
                df.printSchema()
                logging.info(f"Muestra de datos para {file_name}:")
                df.show(5)
                
                # Simular escritura de archivo
                try:
                    logging.info(f"Simulando escritura en {file_path}{file_name}.txt")
                    # Aquí escribiríamos el archivo en S3 usando la lógica del ETL original
                    # ...
                except Exception as e:
                    logging.error(f"Error simulando escritura: {str(e)}")

    except Exception as e:
        logging.error(f"Error processing the event: {e}")
        raise

print("Función process definida correctamente")

### Prueba de Ejecución de la Función Process

In [None]:
# Definir consultas para probar
test_queries = {
    "routes": routes_query
}

# Ejecutar process
process(test_queries, p_empresa, p_version, p_contr, p_fechad, p_fechah, final_contract_codes_df)

## 8. Emulación de Salida del Job

Finalmente, emulamos la salida y el commit del Job como lo haría el script original:

In [None]:
# Generar salida similar a la del script original
response = {
    "statusCode": 200,
    "body": json.dumps(
        {
            "P_EMPRESA": p_empresa,
            "P_VERSION": p_version,
            "P_FECHAD": p_fechad,
            "P_FECHAH": p_fechah,
            "P_CONTR": p_contr,
        }
    ),
}

print("Respuesta final del job:")
print(json.dumps(response, indent=2))

# Commit del job
job.commit()
print("\nJob completado exitosamente")