<a href="https://colab.research.google.com/github/carlosgarcianter/duende_programador/blob/main/add_new_recon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Añadir nueva reconciliacion

## Obtener campos de la reconciliacion y mapeo

Debemos obtener el PDF de configuración de DU.CO de la reconciliación. En uno de sus apartados veremos los campos Matched, Left side, Right side y Calculated. Esos campos tienen su equivalente en Dataplatform y para obtener su nombre nos podemos guiar de la siguiente forma:

- Los nombres en DP no hace distinción entre mayusculas y minúsculas.
- Los espacios se sustituyen por "_".
- Los acentos se sustituyen por "_".
- Los simboloes especiales como "(", ")", "-", ... se sustituyen por "_".
- En algunas ocasiones aparece Nº, Lo llamaremos "Numero" en la clave de nuestro diccionario y en DP se sustituye "º" por "_".
* Cuidado con algunos nombres, hemos llegado a ver nombres que incluyen "," y puede parecer que es el siguiente nombre de la lista.

Como ayuda para comprobar el formateo de los campos se puede lanzar una query con cloud run con anulaciones y los campos:

1. Product: QUERYMPRO
2. Query:

```
  SELECT DISTINCT(d_Fecha_Contrataci_n) FROM bbva_data.results_last_month WHERE process_id='OTCIVOPS' LIMIT 100
```



In [None]:
PROCESS_CODE = "OTCIVOPS"
TABLE_BASE = "CCSW_191CalyOtcIrsOper"
TABLE_BRZ = f"{TABLE_BASE}_BRZ"

TABLE_HIST = "191_HIST_MULTILOADER"
PRODUCT_RECON = "CCSW_191"

In [None]:
common_fields = {
  "Id":                   "r.url_id",
  "Process_Code":         "r.process_id",
  "Run_ID":               "CAST(CAST(r.run_number AS FLOAT) AS INTEGER) AS Run_ID",
  "Roll_up_relationship": "r.roll_up_relationship",
  "Workflow":             "e.workflow",
  "Status":               "r.status",
  "`Group`":                "e.group_id",
  "Assigned_to":          "e.assigned_to_user_id",
  "Ageing_started_at":    "e.creation_timestamp",
  "Age_as_at_export":     "CAST(e.age AS FLOAT) as Age_as_at_export",
  "Resolved_By":          "e.resolved_by_user_id",
  "Latest_Activity":      "'' AS Latest_Activity",
  "Score":                "r.score",
  "`Input`":                "r.input",
  "Labels":               "l.labels AS Labels",
}

common_fields_clean = {
  "Id":                       "STRING",
  "Id_Incremental":           "STRING",
  "Process_Code":             "STRING",
  "Run_ID":                   "INT64",
  "Roll_up_relationship":     "STRING",
  "Workflow":                 "STRING",
  "Status":                   "STRING",
  "`Group`":                    "STRING",
  "Assigned_to":              "STRING",
  "Fecha_Ageing_started_at":  "DATETIME",
  "Float_Age_as_at_export":   "FLOAT64",
  "Resolved_By":              "STRING",
  "Latest_Activity":          "STRING",
  "Score":                    "STRING",
  "`Input`":                    "STRING",
  "Labels":                   "STRING",
}

common_fields_casted = {
  "Id": "",
  "CONCAT(Id, \"-\", `Input`) AS Id_Incremental": "",
  "Process_Code": "",
  "SAFE_CAST(Run_ID AS INT64) AS Run_ID": "",
  "Roll_up_relationship": "",
  "Workflow": "",
  "Status": "",
  "`Group`": "",
  "Assigned_to": "",
  "SAFE_CAST(Ageing_started_at AS DATETIME) AS Fecha_Ageing_started_at": "",
  "SAFE_CAST(REPLACE(Age_as_at_export, \",\", \".\") AS FLOAT64) AS Float_Age_as_at_export": "",
  "Resolved_By": "",
  "Latest_Activity": "",
  "Score": "",
  "`Input`": "",
  "Labels": "",
}

# ==============================================================================
# ==============================================================================

matched_fields = {
  "Numero_Operacion":         "r.s_N_mero_Operaci_n",
  "Buy_Sell":                 "r.s_Buy_Sell",
  "Entidad":                  "r.s_Entidad",
  "Producto":                 "r.s_Producto",
  "Portfolio":                "r.s_Portfolio",
  "Contrapartida":            "r.s_Contrapartida",
  "Nominal_Cobro":            "r.f_Nominal_Cobro",
  "Divisa_Cobro":             "r.s_Divisa_Cobro",
  "Nominal_Pago":             "r.f_Nominal_Pago",
  "Divisa_Pago":              "r.s_Divisa_Pago",
  "Fecha_Contratacion":       "r.d_Fecha_Contrataci_n",
  "Fecha_Vencimiento":        "r.d_Fecha_Vencimiento",
  "Interna":                  "r.s_Interna",
  "Mirror_Trade":             "r.s_Mirror_Trade",
  "Status_191":               "r.s_Status",
  "Nominal_Cobro_Crv_Eur":    "r.f_Nominal_Cobro_Crv_Eur",
  "Nominal_Pago_Crv_Eur":     "r.f_Nominal_Pago_Crv_Eur",
  "Origen":                   "r.s_Origen",
}

left_fields = {
  "Hora_Liberacion":  "r.rl_s_Hora_Liberaci_n",
  "Estructura":       "r.rl_s_Estructura",
}

right_fields = {
  "Trade_Id":         "r.rr_s_Trade_Id",
  "Product_Type":     "r.rr_s_Product_Type",
  "Fecha_Liberacion": "r.rr_d_Fecha_Liberaci_n",
  "Pay_Leg_Type":     "r.rr_s_Pay_Leg_Type",
  "Rcv_Leg_Type":     "r.rr_s_Rcv_Leg_Type",
  "Operation_Type":   "r.rr_s_Operation_Type",
}

calculated_fields = {
}

# ==============================================================================
# ==============================================================================

last_common_fields = {
  "Fecha_Ejecucion":  "r.update_timestamp AS Fecha_Ejecucion",
}

last_common_clean = {
  "fecha_ejecucion_date": "DATE",
  "audit_date_time": "DATETIME",
  "origin": "STRING"
}

last_common_casted = {
  "DATE(SAFE_CAST(Fecha_Ejecucion AS DATETIME)) AS fecha_ejecucion_date": "DATE",
  "CURRENT_DATETIME(\"Europe/Madrid\") AS Audit_Date_Time": "DATETIME",
  f"'{PRODUCT_RECON}_Carga_Inicial' AS Origin": "STRING"
}

## Crear tabla BRZ_TMP

Tras terminar la implementacion de la funcion del proyecto dashboard, que usaremos para automatizar la descarga desde Frontal o la descarga manual desde la cloud run, tenemos que dejar creada la tabla destino

```
reconciliations.[Nombre_recon]_BRZ_TMP
```

In [None]:
params = ",\n\t".join(f"{k} STRING" for k in (common_fields|matched_fields|left_fields|right_fields|calculated_fields|last_common_fields).keys())
query = f"""
    CREATE TABLE `ALCONFOBO.{TABLE_BRZ}_TMP` (
        {params}
    );
"""
print(query)


    CREATE TABLE `ALCONFOBO.CCSW_191CalyOtcIrsOper_BRZ_TMP` (
        Id STRING,
	Process_Code STRING,
	Run_ID STRING,
	Roll_up_relationship STRING,
	Workflow STRING,
	Status STRING,
	`Group` STRING,
	Assigned_to STRING,
	Ageing_started_at STRING,
	Age_as_at_export STRING,
	Resolved_By STRING,
	Latest_Activity STRING,
	Score STRING,
	`Input` STRING,
	Labels STRING,
	Numero_Operacion STRING,
	Buy_Sell STRING,
	Entidad STRING,
	Producto STRING,
	Portfolio STRING,
	Contrapartida STRING,
	Nominal_Cobro STRING,
	Divisa_Cobro STRING,
	Nominal_Pago STRING,
	Divisa_Pago STRING,
	Fecha_Contratacion STRING,
	Fecha_Vencimiento STRING,
	Interna STRING,
	Mirror_Trade STRING,
	Status_191 STRING,
	Nominal_Cobro_Crv_Eur STRING,
	Nominal_Pago_Crv_Eur STRING,
	Origen STRING,
	Hora_Liberacion STRING,
	Estructura STRING,
	Trade_Id STRING,
	Product_Type STRING,
	Fecha_Liberacion STRING,
	Pay_Leg_Type STRING,
	Rcv_Leg_Type STRING,
	Operation_Type STRING,
	Fecha_Ejecucion STRING
    );



Si lo deseamos podemos hacer una prueba para comprobar que los campos han sido mapeados correctamente y ejecutar manualmente la cloud run.
Si todo es correcto deberiamos de ver informacion en la tabla BRZ_TMP

## Solicitar autorizacion de campos

Debemos solicitar mediante un ticket de JIRA que nos autoricen el uso de los campos.

Como ejemplo dejamos el siguiente ticket:  [JIRA - ATHCIB-42755](https://cibproducts.grupobbva.com/JIRA/browse/ATHCIB-42755)

PD: Para que sea más fácil ejecuta las siguientes celdas ocultas que te listarán los headers



In [None]:
print(", ".join(f"{k}" for k in (common_fields|matched_fields|left_fields|right_fields|calculated_fields|last_common_fields).keys()))

Id, Process_Code, Run_ID, Roll_up_relationship, Workflow, Status, `Group`, Assigned_to, Ageing_started_at, Age_as_at_export, Resolved_By, Latest_Activity, Score, `Input`, Labels, Numero_Operacion, Buy_Sell, Entidad, Producto, Portfolio, Contrapartida, Nominal_Cobro, Divisa_Cobro, Nominal_Pago, Divisa_Pago, Fecha_Contratacion, Fecha_Vencimiento, Interna, Mirror_Trade, Status_191, Nominal_Cobro_Crv_Eur, Nominal_Pago_Crv_Eur, Origen, Hora_Liberacion, Estructura, Trade_Id, Product_Type, Fecha_Liberacion, Pay_Leg_Type, Rcv_Leg_Type, Operation_Type, Fecha_Ejecucion


## Historificar datos previos de la reconciliacion

Para ello crearemos la tabla [Numero_recon]_HIST_MULTILOADER (Mismo schema que BRZ_TMP) y ejecutaremos la cloud run para mover los datos

### Crear tabla HIST_MULTILOADER

In [None]:
params = ",\n\t".join(f"`{k}` STRING" for k in (common_fields|matched_fields|left_fields|right_fields|calculated_fields|last_common_fields).keys())
query = f"""
    CREATE TABLE `reconciliations.{TABLE_HIST}` (
        {params}
    );
"""
print(query)


    CREATE TABLE `reconciliations.191_HIST_MULTILOADER` (
        `Id` STRING,
	`Process_Code` STRING,
	`Run_ID` STRING,
	`Roll_up_relationship` STRING,
	`Workflow` STRING,
	`Status` STRING,
	``Group`` STRING,
	`Assigned_to` STRING,
	`Ageing_started_at` STRING,
	`Age_as_at_export` STRING,
	`Resolved_By` STRING,
	`Latest_Activity` STRING,
	`Score` STRING,
	``Input`` STRING,
	`Labels` STRING,
	`Numero_Operacion` STRING,
	`Buy_Sell` STRING,
	`Entidad` STRING,
	`Producto` STRING,
	`Portfolio` STRING,
	`Contrapartida` STRING,
	`Nominal_Cobro` STRING,
	`Divisa_Cobro` STRING,
	`Nominal_Pago` STRING,
	`Divisa_Pago` STRING,
	`Fecha_Contratacion` STRING,
	`Fecha_Vencimiento` STRING,
	`Interna` STRING,
	`Mirror_Trade` STRING,
	`Status_191` STRING,
	`Nominal_Cobro_Crv_Eur` STRING,
	`Nominal_Pago_Crv_Eur` STRING,
	`Origen` STRING,
	`Hora_Liberacion` STRING,
	`Estructura` STRING,
	`Trade_Id` STRING,
	`Product_Type` STRING,
	`Fecha_Liberacion` STRING,
	`Pay_Leg_Type` STRING,
	`Rcv_Leg_Type` STRING,


### Ejecutar cloud run:
1. Product: MULTLOAD
2. schema: bbva_data
3. view_range: _last_year
4. process_code: ------------> SUSTITUIR
5. run_id_from:
6. run_id_to:
7. run_id_interval: 3 -------> Si da problema de memoria poner a 1
8. select_fields: -----------> Siguiente celda oculta
9. header_fields: -----------> Segunda celda oculta
10. table_id: ---------------> Tabla que acabamos de crear

In [None]:
print(", ".join(f"{k}" for k in (common_fields|matched_fields|left_fields|right_fields|calculated_fields|last_common_fields).values()))

r.url_id, r.process_id, CAST(CAST(r.run_number AS FLOAT) AS INTEGER) AS Run_ID, r.roll_up_relationship, e.workflow, r.status, e.group_id, e.assigned_to_user_id, e.creation_timestamp, CAST(e.age AS FLOAT) as Age_as_at_export, e.resolved_by_user_id, '' AS Latest_Activity, r.score, r.input, l.labels AS Labels, r.s_N_mero_Operaci_n, r.s_Buy_Sell, r.s_Entidad, r.s_Producto, r.s_Portfolio, r.s_Contrapartida, r.f_Nominal_Cobro, r.s_Divisa_Cobro, r.f_Nominal_Pago, r.s_Divisa_Pago, r.d_Fecha_Contrataci_n, r.d_Fecha_Vencimiento, r.s_Interna, r.s_Mirror_Trade, r.s_Status, r.f_Nominal_Cobro_Crv_Eur, r.f_Nominal_Pago_Crv_Eur, r.s_Origen, r.rl_s_Hora_Liberaci_n, r.rl_s_Estructura, r.rr_s_Trade_Id, r.rr_s_Product_Type, r.rr_d_Fecha_Liberaci_n, r.rr_s_Pay_Leg_Type, r.rr_s_Rcv_Leg_Type, r.rr_s_Operation_Type, r.update_timestamp AS Fecha_Ejecucion


In [None]:
print(", ".join(f"{k}" for k in (common_fields|matched_fields|left_fields|right_fields|calculated_fields|last_common_fields).keys()))

Id, Process_Code, Run_ID, Roll_up_relationship, Workflow, Status, `Group`, Assigned_to, Ageing_started_at, Age_as_at_export, Resolved_By, Latest_Activity, Score, `Input`, Labels, Numero_Operacion, Buy_Sell, Entidad, Producto, Portfolio, Contrapartida, Nominal_Cobro, Divisa_Cobro, Nominal_Pago, Divisa_Pago, Fecha_Contratacion, Fecha_Vencimiento, Interna, Mirror_Trade, Status_191, Nominal_Cobro_Crv_Eur, Nominal_Pago_Crv_Eur, Origen, Hora_Liberacion, Estructura, Trade_Id, Product_Type, Fecha_Liberacion, Pay_Leg_Type, Rcv_Leg_Type, Operation_Type, Fecha_Ejecucion


### Añadir datos de HIST_MULTILOADER a GLD

In [None]:
# ==============================================================================
# Obtener fecha más actual de cada registro
# ==============================================================================

first_query = f"""
  CREATE TABLE reconciliations.{TABLE_HIST}_MAX_REG_AUX AS
  SELECT ID, MAX(Fecha_Ejecucion) as Fecha_Ejecucion
  FROM bbva-duco-dashboard.reconciliations.{TABLE_HIST}
  GROUP BY ID;
"""

# ==============================================================================
# Crear tabla con registro más actualizados
# ==============================================================================

second_query = f"""
  CREATE TABLE reconciliations.{TABLE_HIST}_GLD AS
  SELECT A.*
  FROM bbva-duco-dashboard.reconciliations.{TABLE_HIST} A
  INNER JOIN bbva-duco-dashboard.reconciliations.{TABLE_HIST}_MAX_REG_AUX B
    ON A.ID = B.ID AND A.Fecha_Ejecucion = B.Fecha_Ejecucion;
"""

# ==============================================================================
# Tipado de campos especificos de la reconciliacion
# ==============================================================================

matched_fields_clean = {}
for k, v in matched_fields.items():
    new_v = v
    new_v = new_v.replace("r.", "")
    matched_fields_clean[k] = new_v

left_fields_clean = {}
for k, v in left_fields.items():
    new_v = v
    new_v = new_v.replace("r.rl_", "")
    left_fields_clean[k] = new_v

right_fields_clean = {}
for k, v in right_fields.items():
    new_v = v
    new_v = new_v.replace("r.rr_", "")
    right_fields_clean[k] = new_v

calculated_fields_clean = {}
for k, v in calculated_fields.items():
    new_v = v
    new_v = new_v.replace("r.cf_", "")
    calculated_fields_clean[k] = new_v

rules = {
    "b_": "BOOLEAN",
    "d_": "DATE",
    "dt_": "DATETIME",
    "f_": "FLOAT64",
    "i_": "INT64",
    "s_": "STRING",
    "t_": "TIME",
}

typed_recon_fields = {}
for k, v in (matched_fields_clean|left_fields_clean|right_fields_clean|calculated_fields_clean).items():
    tipo = None
    for prefix, t in rules.items():
        if v.startswith(prefix):
            tipo = t
            break
    typed_recon_fields[k] = tipo if tipo else "UNKNOWN"  # fallback por si no hay regla

rename_fields = {}
for k, v in typed_recon_fields.items():
    if v == "FLOAT64":
        new_key = f"Float_{k}"
    elif v == "INT64":
        new_key = f"Int_{k}"
    elif v == "DATE":
        new_key = f"Fecha_{k}"
    else:
        new_key = k
    rename_fields[new_key] = v


# ==============================================================================
# Crear tabla GLD
# ==============================================================================

TABLE_GLD = TABLE_BRZ.replace("BRZ", "GLD")

params = ",\n\t".join(f"`{k}` {v}" for k, v in (common_fields_clean|rename_fields|last_common_clean).items())

third_query = f"""
  CREATE TABLE `ALCONFOBO.{TABLE_GLD}` (
    \t{params}
  );
"""

# ==============================================================================
# Casteo de los datos de HIST a tipado de GLD
# ==============================================================================

casted_recon_fields = {}
for k, v in typed_recon_fields.items():
    if v == "FLOAT64":
        new_key = f"SAFE_CAST(REPLACE({k}, \",\", \".\") AS {v}) AS Float_{k}"
    elif v == "INT64":
        new_key = f"SAFE_CAST({k} AS {v}) AS Int_{k}"
    elif v == "DATE":
        new_key = f"SAFE_CAST({k} AS {v}) AS Fecha_{k}"
    else:
        new_key = k
    casted_recon_fields[new_key] = v

keys = ",\n\t".join(f"`{k}`" for k in (common_fields_clean|rename_fields|last_common_clean).keys())
params = ",\n\t".join(f"{k}" for k in (common_fields_casted|casted_recon_fields|last_common_casted).keys())
fourth_query = f"""
  INSERT INTO `ALCONFOBO.{TABLE_GLD}` (
    \t{keys}
  )
  SELECT
    \t{params}
  FROM `reconciliations.{TABLE_HIST}_GLD`
"""

#===============================================================================

query = first_query + "\n" + second_query + "\n" + third_query + "\n" + fourth_query
print(query)


  CREATE TABLE reconciliations.191_HIST_MULTILOADER_MAX_REG_AUX AS
  SELECT ID, MAX(Fecha_Ejecucion) as Fecha_Ejecucion
  FROM bbva-duco-dashboard.reconciliations.191_HIST_MULTILOADER
  GROUP BY ID;


  CREATE TABLE reconciliations.191_HIST_MULTILOADER_GLD AS
  SELECT A.*
  FROM bbva-duco-dashboard.reconciliations.191_HIST_MULTILOADER A
  INNER JOIN bbva-duco-dashboard.reconciliations.191_HIST_MULTILOADER_MAX_REG_AUX B
    ON A.ID = B.ID AND A.Fecha_Ejecucion = B.Fecha_Ejecucion;


  CREATE TABLE `ALCONFOBO.CCSW_191CalyOtcIrsOper_GLD` (
    	`Id` STRING,
	`Id_Incremental` STRING,
	`Process_Code` STRING,
	`Run_ID` INT64,
	`Roll_up_relationship` STRING,
	`Workflow` STRING,
	`Status` STRING,
	``Group`` STRING,
	`Assigned_to` STRING,
	`Fecha_Ageing_started_at` DATETIME,
	`Float_Age_as_at_export` FLOAT64,
	`Resolved_By` STRING,
	`Latest_Activity` STRING,
	`Score` STRING,
	``Input`` STRING,
	`Labels` STRING,
	`Numero_Operacion` STRING,
	`Buy_Sell` STRING,
	`Entidad` STRING,
	`Producto` STRI

## DATAFORMS

In [None]:
rename_fields

{'Numero_Operacion': 'STRING',
 'Buy_Sell': 'STRING',
 'Entidad': 'STRING',
 'Producto': 'STRING',
 'Portfolio': 'STRING',
 'Contrapartida': 'STRING',
 'Float_Nominal_Cobro': 'FLOAT64',
 'Divisa_Cobro': 'STRING',
 'Float_Nominal_Pago': 'FLOAT64',
 'Divisa_Pago': 'STRING',
 'Fecha_Fecha_Contratacion': 'DATE',
 'Fecha_Fecha_Vencimiento': 'DATE',
 'Interna': 'STRING',
 'Mirror_Trade': 'STRING',
 'Status_191': 'STRING',
 'Float_Nominal_Cobro_Crv_Eur': 'FLOAT64',
 'Float_Nominal_Pago_Crv_Eur': 'FLOAT64',
 'Origen': 'STRING',
 'Hora_Liberacion': 'STRING',
 'Estructura': 'STRING',
 'Trade_Id': 'STRING',
 'Product_Type': 'STRING',
 'Fecha_Fecha_Liberacion': 'DATE',
 'Pay_Leg_Type': 'STRING',
 'Rcv_Leg_Type': 'STRING',
 'Operation_Type': 'STRING'}

In [None]:
TABLE_BRZ_HIST = f"{TABLE_BASE}_BRZ_HIST"
TABLE_BRZ_TMP = f"{TABLE_BASE}_BRZ_TMP"
TABLE_SLV = f"{TABLE_BASE}_SLV"
TABLE_GLD = f"{TABLE_BASE}_GLD"

RECON_NUMBER = "Recon191"

### BRZ

In [None]:
query = f"""
config {{
  type: "declaration",
  name: "{TABLE_BRZ}",
  schema: "ALCONFOBO"
}}
"""
print(query)


config {
  type: "declaration",
  name: "CCSW_191CalyOtcIrsOper_BRZ",
  schema: "ALCONFOBO"
}



### BRZ_STG

In [None]:
query = f"""
config {{
  type: "operations",
  name: "move_{TABLE_BRZ}_from_TMP_to_BRZ_and_HIST",
  tags: ["bronze", "load", "hist", "{RECON_NUMBER}"]
}}

-- Paso 1: Obtener la Fecha_Ejecucion más antigua
DECLARE fecha_min DATE;

SET fecha_min = (
  SELECT MIN(DATE (Fecha_Ejecucion))
  FROM `ALCONFOBO.{TABLE_BRZ_TMP}`
);

-- Paso 1.1: Validar si hay datos. Solo continuar si fecha_min no es NULL
IF fecha_min IS NOT NULL THEN

  -- Paso 2.1: Limpiar la tabla BRZ antes de cargar nuevos registros
  DELETE FROM `ALCONFOBO.{TABLE_BRZ}` WHERE TRUE;

  -- Paso 2.2: Insertar en tabla BRZ definitiva
  INSERT INTO `ALCONFOBO.{TABLE_BRZ}`
  SELECT *
  FROM `ALCONFOBO.{TABLE_BRZ_TMP}`
  WHERE DATE (Fecha_Ejecucion) = fecha_min;

  -- Paso 3: Insertar en tabla histórica con marca de fecha (Madrid)
  INSERT INTO `ALCONFOBO.{TABLE_BRZ_HIST}`
  SELECT
    t.*,
    DATE(CURRENT_DATETIME("Europe/Madrid")) AS Fecha_Historificacion
  FROM `ALCONFOBO.{TABLE_BRZ_TMP}` t
  WHERE DATE (Fecha_Ejecucion) = fecha_min;

  -- Paso 4: Borrar de la temporal
  DELETE FROM `ALCONFOBO.{TABLE_BRZ_TMP}`
  WHERE DATE (Fecha_Ejecucion) = fecha_min;

END IF;
"""
print(query)


config {
  type: "operations",
  name: "move_CCSW_191CalyOtcIrsOper_BRZ_from_TMP_to_BRZ_and_HIST",
  tags: ["bronze", "load", "hist", "Recon191"]
}

-- Paso 1: Obtener la Fecha_Ejecucion más antigua
DECLARE fecha_min DATE;

SET fecha_min = (
  SELECT MIN(DATE (Fecha_Ejecucion))
  FROM `ALCONFOBO.CCSW_191CalyOtcIrsOper_BRZ_TMP`
);

-- Paso 1.1: Validar si hay datos. Solo continuar si fecha_min no es NULL
IF fecha_min IS NOT NULL THEN

  -- Paso 2.1: Limpiar la tabla BRZ antes de cargar nuevos registros
  DELETE FROM `ALCONFOBO.CCSW_191CalyOtcIrsOper_BRZ` WHERE TRUE;

  -- Paso 2.2: Insertar en tabla BRZ definitiva
  INSERT INTO `ALCONFOBO.CCSW_191CalyOtcIrsOper_BRZ`
  SELECT *
  FROM `ALCONFOBO.CCSW_191CalyOtcIrsOper_BRZ_TMP`
  WHERE DATE (Fecha_Ejecucion) = fecha_min;

  -- Paso 3: Insertar en tabla histórica con marca de fecha (Madrid)
  INSERT INTO `ALCONFOBO.CCSW_191CalyOtcIrsOper_BRZ_HIST`
  SELECT
    t.*,
    DATE(CURRENT_DATETIME("Europe/Madrid")) AS Fecha_Historificacion
  FRO

### SLV

In [None]:
keys = ",\n\t".join(f"{k}" for k in (common_fields_casted|casted_recon_fields).keys())

query = f"""
config {{
  type: "table",
  name: "{TABLE_SLV}",
  tags: ["silver", "{RECON_NUMBER}"],
  dependencies: ["move_{TABLE_BRZ}_from_TMP_to_BRZ_and_HIST"]
}}

SELECT
  \t{keys},
  \tDATE(SAFE_CAST(Fecha_Ejecucion AS DATETIME)) AS Fecha_Ejecucion_Date,
  \tCURRENT_DATETIME('Europe/Madrid') AS Audit_Date_Time,
  \t'{TABLE_BRZ}' AS Origin
FROM ${{ref("{TABLE_BRZ}")}}
"""

print(query)


config {
  type: "table",
  name: "CCSW_191CalyOtcIrsOper_SLV",
  tags: ["silver", "Recon191"],
  dependencies: ["move_CCSW_191CalyOtcIrsOper_BRZ_from_TMP_to_BRZ_and_HIST"]
}

SELECT
  	Id,
	CONCAT(Id, "-", `Input`) AS Id_Incremental,
	Process_Code,
	SAFE_CAST(Run_ID AS INT64) AS Run_ID,
	Roll_up_relationship,
	Workflow,
	Status,
	`Group`,
	Assigned_to,
	SAFE_CAST(Ageing_started_at AS DATETIME) AS Fecha_Ageing_started_at,
	SAFE_CAST(REPLACE(Age_as_at_export, ",", ".") AS FLOAT64) AS Float_Age_as_at_export,
	Resolved_By,
	Latest_Activity,
	Score,
	`Input`,
	Labels,
	Numero_Operacion,
	Buy_Sell,
	Entidad,
	Producto,
	Portfolio,
	Contrapartida,
	SAFE_CAST(REPLACE(Nominal_Cobro, ",", ".") AS FLOAT64) AS Float_Nominal_Cobro,
	Divisa_Cobro,
	SAFE_CAST(REPLACE(Nominal_Pago, ",", ".") AS FLOAT64) AS Float_Nominal_Pago,
	Divisa_Pago,
	SAFE_CAST(Fecha_Contratacion AS DATE) AS Fecha_Fecha_Contratacion,
	SAFE_CAST(Fecha_Vencimiento AS DATE) AS Fecha_Fecha_Vencimiento,
	Interna,
	Mirror_Trade,
	St

###GLD

In [None]:
update = ",\n\t".join(f"`{k}` = source.`{k}`" for k in (common_fields_clean|rename_fields).keys())
insert = ",\n\t".join(f"`{k}`" for k in (common_fields_clean|rename_fields).keys())
values = ",\n\t".join(f"source.`{k}`" for k in (common_fields_clean|rename_fields).keys())

query = f"""
config {{
  type: "operations",
  name: "{TABLE_GLD}",
  tags: ["gold", "{RECON_NUMBER}"],
  dependencies: ["{TABLE_SLV}"]
}}

MERGE INTO `ALCONFOBO.{TABLE_GLD}` AS target
USING (
  SELECT
    *
  FROM ${{ref("{TABLE_SLV}")}}
) AS source
ON target.Id_Incremental = source.Id_Incremental

WHEN MATCHED THEN
  UPDATE SET
  \t{update},
  \t`Fecha_Ejecucion_Date` = source.`Fecha_Ejecucion_Date`,
  \t`Audit_Date_Time` = DATETIME(CURRENT_TIMESTAMP(), "Europe/Madrid"),
  \t`Origin` = '{TABLE_SLV}-Actualizacion'
WHEN NOT MATCHED THEN
  INSERT (
  \t{insert},
  \t`Fecha_Ejecucion_Date`,
  \t`Audit_Date_Time`,
  \t`Origin`
  )
  VALUES (
  \t{values},
  \t`Fecha_Ejecucion_Date`,
  \tCURRENT_DATETIME('Europe/Madrid'),
  \t'{TABLE_SLV}-Nuevo'
  );
"""

print(query)


config {
  type: "operations",
  name: "CCSW_191CalyOtcIrsOper_GLD",
  tags: ["gold", "Recon191"],
  dependencies: ["CCSW_191CalyOtcIrsOper_SLV"]
}

MERGE INTO `ALCONFOBO.CCSW_191CalyOtcIrsOper_GLD` AS target
USING (
  SELECT
    *
  FROM ${ref("CCSW_191CalyOtcIrsOper_SLV")}
) AS source
ON target.Id_Incremental = source.Id_Incremental

WHEN MATCHED THEN
  UPDATE SET
  	`Id` = source.`Id`,
	`Id_Incremental` = source.`Id_Incremental`,
	`Process_Code` = source.`Process_Code`,
	`Run_ID` = source.`Run_ID`,
	`Roll_up_relationship` = source.`Roll_up_relationship`,
	`Workflow` = source.`Workflow`,
	`Status` = source.`Status`,
	``Group`` = source.``Group``,
	`Assigned_to` = source.`Assigned_to`,
	`Fecha_Ageing_started_at` = source.`Fecha_Ageing_started_at`,
	`Float_Age_as_at_export` = source.`Float_Age_as_at_export`,
	`Resolved_By` = source.`Resolved_By`,
	`Latest_Activity` = source.`Latest_Activity`,
	`Score` = source.`Score`,
	``Input`` = source.``Input``,
	`Labels` = source.`Labels`,
	`Nume