<a href="https://colab.research.google.com/github/DavidP0011/etl_functions/blob/main/etl_hs_sensitive_02st.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# INICIALIZACIÓN

In [1]:
# @title INSTALACIÓN DE REPOSITORIO DE FUNCIONES PY
!pip install --force-reinstall git+https://github.com/DavidP0011/functions_for_notebooks@main


Collecting git+https://github.com/DavidP0011/functions_for_notebooks@main
  Cloning https://github.com/DavidP0011/functions_for_notebooks (to revision main) to /tmp/pip-req-build-plneta6u
  Running command git clone --filter=blob:none --quiet https://github.com/DavidP0011/functions_for_notebooks /tmp/pip-req-build-plneta6u
  Resolved https://github.com/DavidP0011/functions_for_notebooks to commit b1f35391ea69c67e0c73f34d1bfdea454f46a8a9
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Building wheels for collected packages: funcones-py-notebooks
  Building wheel for funcones-py-notebooks (pyproject.toml) ... [?25l[?25hdone
  Created wheel for funcones-py-notebooks: filename=funcones_py_notebooks-0.0.0-py3-none-any.whl size=70177 sha256=3f3f9975b743e7761fd6d5b5b56597f6b7ce9b55bb92effe864d731cef02f40b
  Stored in directory: /tmp/pip-ephem-wheel-cache-unogijl3/wheels

In [2]:
# @title IDENTIFICACION DE ENTORNO, INSTALACIÓN GOOGLE DRIVE

from common.dpm_GCP_ini import ini_environment_identification, ini_google_drive_instalation

# Detectar el entorno de ejecución
ini_environment_identificated = ini_environment_identification()
print(f"[INFO ℹ️] Entorno detectado: {ini_environment_identificated}", flush=True)

# Montar Google Drive si entorno_identificado_str es Colab
params = {"entorno_identificado_str": ini_environment_identificated}
ini_google_drive_instalation(params)

# Declarar las rutas de las credenciales
GCP_json_keyfile_local = r"C:/api_keys/XXX.json"
GCP_json_keyfile_colab = "/content/drive/MyDrive/ANIMUM DIRECCION/DIRECCION BI/NOTEBOOKS/api_keys/animum-dev-datawarehouse-google-colab.json"
GCP_json_keyfile_GCP_secret_id = "notebook-vm"

[INFO ℹ️] Entorno detectado: COLAB
Mounted at /content/drive
[INFO ℹ️] Google Drive montado correctamente.


In [14]:
# @title secrets_as_os_environ()

# __________________________________________________________________________________________________________________________________________________________
# secrets_get_dic
# __________________________________________________________________________________________________________________________________________________________
def secrets_get_dic(config: dict) -> dict:
    """
    Recupera secretos desde Secret Manager y los devuelve en un diccionario {secret_id: valor}.
    No toca os.environ.

    Args:
        config (dict):
            - project_id (str): ID del proyecto GCP.
            - secrets_list (list[str]): Lista de IDs de secretos a leer.
            - ini_environment_identificated (str, opcional): "LOCAL" | "COLAB" | "COLAB_ENTERPRISE" | <otro>.
            - json_keyfile_local (str, opcional): Ruta JSON si entorno LOCAL.
            - json_keyfile_colab (str, opcional): Ruta JSON si entorno COLAB.
            - json_keyfile_GCP_secret_id (str, opcional): SecretId con la JSON key si entorno GCP.

    Returns:
        dict: Mapa {secret_id: secret_value}.

    Raises:
        ValueError: Si faltan parámetros obligatorios o tipos inválidos.
        Exception: Si falla el acceso a Secret Manager.
    """
    import time
    from google.cloud import secretmanager

    project_id_str = config.get("project_id")
    secrets_list = config.get("secrets_list")
    if not project_id_str:
        raise ValueError("[VALIDATION [ERROR ❌]] Falta 'project_id' en config.")
    if not secrets_list or not isinstance(secrets_list, list):
        raise ValueError("[VALIDATION [ERROR ❌]] Falta 'secrets_list' (list) en config.")

    print("🔹🔹🔹 [START ▶️] Lectura de secretos (in-memory) 🔹🔹🔹", flush=True)
    t0 = time.time()

    try:
        credentials = _ini_authenticate_API(config, project_id_str)
        client_sm = secretmanager.SecretManagerServiceClient(credentials=credentials)
        print("[AUTHENTICATION SUCCESS ✅] Credenciales obtenidas.", flush=True)

        secrets_dic = {}
        warnings_int = 0
        for secret_id in secrets_list:
            try:
                name = f"projects/{project_id_str}/secrets/{secret_id}/versions/latest"
                response = client_sm.access_secret_version(name=name)
                secrets_dic[secret_id] = response.payload.data.decode("UTF-8")
                print(f"[SECRET SUCCESS ✅] '{secret_id}' leído.", flush=True)
            except Exception as e:
                warnings_int += 1
                print(f"[SECRET WARNING ⚠️] No se pudo leer '{secret_id}': {e}", flush=True)

        elapsed = round(time.time() - t0, 2)
        print("🔹🔹🔹 [METRICS 📊] Resumen 🔹🔹🔹", flush=True)
        print(f"[METRICS INFO ℹ️] Secretos solicitados: {len(secrets_list)}", flush=True)
        print(f"[METRICS INFO ℹ️] Advertencias: {warnings_int}", flush=True)
        print(f"[END FINISHED ✅] Tiempo total: {elapsed} s", flush=True)
        return secrets_dic

    except Exception as e:
        raise Exception(f"[PROCESS ERROR ❌] Fallo en secrets_get_dic: {e}")



In [23]:
# @title IMPORTACIÓN DE SECRETOS HS

from common.dpm_GCP_ini import _ini_authenticate_API

# Configuración actualizada para incluir tanto el access token como el client secret
config = {
    "project_id": "animum-dev-datawarehouse",
    "ini_environment_identificated": ini_environment_identificated,
    "json_keyfile_local": GCP_json_keyfile_local,
    "json_keyfile_colab": GCP_json_keyfile_colab,
    "json_keyfile_GCP_secret_id": GCP_json_keyfile_GCP_secret_id,
    "secrets_list": [
        "hs_datawarehouse_sensitive_acces_token",    # Tu access token existente
        "hs_datawarehouse_sensitive_secret_key"   # El nuevo client secret
    ]
}

# Obtener ambos secretos
secrets_dic = secrets_get_dic(config)

# Variables para HubSpot OAuth
hs_datawarehouse_sensitive_acces_token = secrets_dic["hs_datawarehouse_sensitive_acces_token"]
hs_datawarehouse_sensitive_secret_key = secrets_dic["hs_datawarehouse_sensitive_secret_key"]

print(f"[HUBSPOT CONFIG ℹ️] Access token loaded: {'✅' if hs_datawarehouse_sensitive_acces_token else '❌'}")
print(f"[HUBSPOT CONFIG ℹ️] Client secret loaded: {'✅' if hs_datawarehouse_sensitive_secret_key else '❌'}")

# Si necesitas también el Client ID, agrégalo aquí:
# "hs_datawarehouse_client_id"        # Si también necesitas almacenar el Client ID

🔹🔹🔹 [START ▶️] Lectura de secretos (in-memory) 🔹🔹🔹
[AUTHENTICATION SUCCESS ✅] Credenciales obtenidas.
[SECRET SUCCESS ✅] 'hs_datawarehouse_sensitive_acces_token' leído.
[SECRET SUCCESS ✅] 'hs_datawarehouse_sensitive_secret_key' leído.
🔹🔹🔹 [METRICS 📊] Resumen 🔹🔹🔹
[METRICS INFO ℹ️] Secretos solicitados: 2
[METRICS INFO ℹ️] Advertencias: 0
[END FINISHED ✅] Tiempo total: 0.2 s
[HUBSPOT CONFIG ℹ️] Access token loaded: ✅
[HUBSPOT CONFIG ℹ️] Client secret loaded: ✅


In [32]:
# @title HS_to_GBQ_sensitive_data()
# __________________________________________________________________________________________________________________________________________________________
# HS_to_GBQ_sensitive_data
# __________________________________________________________________________________________________________________________________________________________
def HS_to_GBQ_sensitive_data(config: dict) -> None:
    """
    Extrae contactos de HubSpot en chunks de hasta 10k contactos (paginando por createdate),
    enriquece con propiedades sensibles vía batch/read y carga cada chunk en BigQuery
    a través de un CSV temporal en GCS. Autenticación centralizada con _ini_authenticate_API().

    Args:
        config (dict):
            # --- Autenticación / Entorno ---
            - ini_environment_identificated (str): "LOCAL" | "COLAB" | "COLAB_ENTERPRISE" | otro entorno GCP.
            - json_keyfile_local (str, opcional): Ruta JSON si entorno LOCAL.
            - json_keyfile_colab (str, opcional): Ruta JSON si entorno COLAB.
            - json_keyfile_GCP_secret_id (str, opcional): SecretId con la JSON key si entorno GCP.
            # --- Destinos GCP ---
            - GBQ_project_id (str): ID del proyecto para BigQuery/Storage.
            - GCS_bucket_name (str): Bucket temporal para CSVs.
            - GBQ_dataset_id (str): Dataset destino.
            - GBQ_table_id (str): Tabla destino.
            # --- HubSpot ---
            - HS_api_key (str): Token de Private App (o usa os.environ y pásalo aquí).
            - HS_fields_no_sensitive_names_list (list[str]): Propiedades NO sensibles para /search (fuerza "id" y "createdate").
            - HS_fields_sensitive_names_list (list[str]): Propiedades sensibles para /batch/read.
            - HS_api_lines_per_call (int): Límite por llamada a /search (<=100 es razonable).
            - hs_contact_filter_createdate (dict): {"from": "YYYY-MM-DD", "to": "YYYY-MM-DD|''", "mode": "between"}.

    Returns:
        None

    Raises:
        ValueError: Si faltan parámetros obligatorios o el rango de fechas es inválido.
        Exception: Si falla la extracción/carga tras reintentos.
    """
    # --------------------------- Importaciones locales (reducir dependencias globales) ---------------------------
    import os
    import io
    import uuid
    import time
    import math
    import csv
    import json
    import re
    import pandas as pd
    import requests
    from datetime import datetime, timedelta, timezone
    from google.cloud import bigquery, storage, secretmanager  # noqa: F401 (secretmanager no se usa aquí)
    # ------------------------------------------------------------------------------------------------------------

    # --------------------------- VALIDACIÓN DE PARÁMETROS -------------------------------------------------------
    required_keys = [
        "GBQ_project_id", "GCS_bucket_name", "GBQ_dataset_id", "GBQ_table_id",
        "HS_api_key", "HS_fields_no_sensitive_names_list", "HS_fields_sensitive_names_list",
        "HS_api_lines_per_call", "hs_contact_filter_createdate"
    ]
    missing = [k for k in required_keys if k not in config]
    if missing:
        raise ValueError(f"[VALIDATION [ERROR ❌]] Faltan claves en config: {missing}")
    # Validaciones de tipo mínimas
    if not isinstance(config["HS_fields_no_sensitive_names_list"], list):
        raise ValueError("[VALIDATION [ERROR ❌]] 'HS_fields_no_sensitive_names_list' debe ser list.")
    if not isinstance(config["HS_fields_sensitive_names_list"], list):
        raise ValueError("[VALIDATION [ERROR ❌]] 'HS_fields_sensitive_names_list' debe ser list.")
    if not isinstance(config["hs_contact_filter_createdate"], dict):
        raise ValueError("[VALIDATION [ERROR ❌]] 'hs_contact_filter_createdate' debe ser dict.")
    # Reforzamos 'id' y 'createdate' en NO sensibles
    if "id" not in config["HS_fields_no_sensitive_names_list"]:
        config["HS_fields_no_sensitive_names_list"].append("id")
    if "createdate" not in config["HS_fields_no_sensitive_names_list"]:
        config["HS_fields_no_sensitive_names_list"].append("createdate")
    print("🔹🔹🔹 [START ▶️] HS_to_GBQ_sensitive_data (extracción y carga) 🔹🔹🔹", flush=True)  # :contentReference[oaicite:6]{index=6}

    # --------------------------- PARÁMETROS BASE ---------------------------------------------------------------
    GBQ_project_id = config["GBQ_project_id"]
    GCS_bucket_name = config["GCS_bucket_name"]
    GBQ_dataset_id = config["GBQ_dataset_id"]
    GBQ_table_id = config["GBQ_table_id"]
    HS_api_key = config["HS_api_key"]
    hs_fields_no_sensitive = sorted(config["HS_fields_no_sensitive_names_list"])
    hs_fields_sensitive = sorted(config["HS_fields_sensitive_names_list"])
    HS_api_lines_per_call = int(config["HS_api_lines_per_call"])
    createdate_filter = config["hs_contact_filter_createdate"]

    # Fechas (acepta to == "" -> hoy 23:59:59)
    try:
        from_date = datetime.strptime(createdate_filter["from"], "%Y-%m-%d").replace(tzinfo=timezone.utc)
        if not createdate_filter.get("to"):
            to_date = datetime.now(timezone.utc).replace(hour=23, minute=59, second=59, microsecond=0)
        else:
            to_date = datetime.strptime(createdate_filter["to"], "%Y-%m-%d").replace(
                hour=23, minute=59, second=59, tzinfo=timezone.utc
            )
        if to_date < from_date:
            raise ValueError("Rango de fechas inválido: 'to' < 'from'.")
    except Exception as e:
        raise ValueError(f"[VALIDATION [ERROR ❌]] Fechas inválidas en 'hs_contact_filter_createdate': {e}")  # :contentReference[oaicite:7]{index=7}

    # --------------------------- AUTENTICACIÓN CENTRALIZADA ----------------------------------------------------
    print("[AUTHENTICATION START ▶️] Inicializando credenciales...", flush=True)
    credentials = _ini_authenticate_API(config, GBQ_project_id)  # usa ini_environment_identificated & json_keyfile_*  :contentReference[oaicite:8]{index=8}
    bq_client = bigquery.Client(project=GBQ_project_id, credentials=credentials)
    st_client = storage.Client(project=GBQ_project_id, credentials=credentials)
    print("[AUTHENTICATION SUCCESS ✅] Credenciales listas.", flush=True)

    # --------------------------- AUXILIARES INTERNAS -----------------------------------------------------------
    def _iso_z(dt: datetime) -> str:
        return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")

    def _hubspot_search_chunk(start_dt: datetime, end_dt: datetime, limit_chunk: int = 10_000):
        """Devuelve (rows_list, last_createdate_dt, reached_limit_bool) con backoff básico."""
        url = "https://api.hubapi.com/crm/v3/objects/contacts/search"
        headers = {"Authorization": f"Bearer {HS_api_key}", "Content-Type": "application/json"}
        accumulated, after = [], None
        # bucle paginado hasta llenar el chunk
        while len(accumulated) < limit_chunk:
            remaining = limit_chunk - len(accumulated)
            body = {
                "filterGroups": [{
                    "filters": [
                        {"propertyName": "createdate", "operator": "GTE", "value": _iso_z(start_dt)},
                        {"propertyName": "createdate", "operator": "LTE", "value": _iso_z(end_dt)}
                    ]
                }],
                "sorts": [{"propertyName": "createdate", "direction": "ASCENDING"}],
                "properties": hs_fields_no_sensitive,
                "limit": min(HS_api_lines_per_call, remaining)
            }
            if after:
                body["after"] = after
            # backoff simple
            retry, max_retry, backoff = 0, 5, 1.2
            while True:
                resp = requests.post(url, headers=headers, json=body, timeout=60)
                if resp.status_code == 200:
                    break
                retry += 1
                if retry > max_retry:
                    raise requests.HTTPError(f"/search fallo tras {max_retry} reintentos: {resp.text}")
                time.sleep(backoff ** retry)
            data = resp.json()
            results = data.get("results", [])
            if not results:
                break
            for r in results:
                props = r.get("properties", {})
                row = {c: (r.get("id") if c == "id" else props.get(c)) for c in hs_fields_no_sensitive}
                accumulated.append(row)
            after = data.get("paging", {}).get("next", {}).get("after")
            if not after:
                break

        if not accumulated:
            return [], None, False

        last_cdate_str = accumulated[-1].get("createdate")
        try:
            # HubSpot devuelve ISO con 'Z'
            last_dt = datetime.fromisoformat(last_cdate_str.replace("Z", "+00:00")).astimezone(timezone.utc) if last_cdate_str else None
        except Exception:
            last_dt = None
        reached = (len(accumulated) >= limit_chunk)
        return accumulated, last_dt, reached

    def _hubspot_batch_sensitive(contact_rows: list) -> pd.DataFrame:
        if not hs_fields_sensitive:
            print("[PROCESSING INFO ℹ️] Sin propiedades sensibles; devolviendo NO sensibles.", flush=True)
            return pd.DataFrame(contact_rows)
        if not contact_rows:
            print("[PROCESSING INFO ℹ️] Lista vacía de contactos; nada que enriquecer.", flush=True)
            return pd.DataFrame([])

        ids = [r.get("id") for r in contact_rows if r.get("id")]
        if not ids:
            print("[PROCESSING WARNING ⚠️] No hay IDs; se omite batch/read.", flush=True)
            return pd.DataFrame(contact_rows)

        url_b = "https://api.hubapi.com/crm/v3/objects/contacts/batch/read"
        headers = {"Authorization": f"Bearer {HS_api_key}", "Content-Type": "application/json"}

        id_to_sens = {}
        chunk_size = 100
        total = len(ids)
        n_chunks = math.ceil(total / chunk_size)
        print(f"[PROCESSING INFO ℹ️] Recuperando {len(hs_fields_sensitive)} propiedades sensibles para {total} IDs en {n_chunks} lotes.", flush=True)

        for i in range(n_chunks):
            subset = ids[i*chunk_size : (i+1)*chunk_size]
            body_b = {"properties": hs_fields_sensitive, "inputs": [{"id": cid} for cid in subset]}
            # backoff simple
            retry, max_retry, backoff = 0, 5, 1.2
            while True:
                resp_b = requests.post(url_b, headers=headers, json=body_b, timeout=60)
                if resp_b.status_code == 200:
                    break
                retry += 1
                if retry > max_retry:
                    raise requests.HTTPError(f"/batch/read fallo tras {max_retry} reintentos: {resp_b.text}")
                time.sleep(backoff ** retry)

            data_b = resp_b.json()
            for rb in data_b.get("results", []):
                c_id = rb.get("id")
                p_b = rb.get("properties", {}) or {}
                id_to_sens[c_id] = {s: p_b.get(s) for s in hs_fields_sensitive}
            print(f"[PROCESSING INFO ℹ️] Lote {i+1}/{n_chunks} procesado.", flush=True)

        df = pd.DataFrame(contact_rows)
        for sfield in hs_fields_sensitive:
            df[sfield] = df["id"].apply(lambda cid: (id_to_sens.get(cid) or {}).get(sfield))
        return df

    def _upload_df_to_bq(df: pd.DataFrame, disposition: str = "WRITE_APPEND"):
        if df.empty:
            print("[LOAD WARNING ⚠️] DataFrame vacío; se omite carga.", flush=True)
            return 0

        # CSV temporal local -> GCS -> BQ
        tmp_local = f"temp_contacts_{uuid.uuid4().hex}.csv"
        df.to_csv(tmp_local, index=False, quoting=csv.QUOTE_MINIMAL)
        blob_name = f"tmp_hs/contacts_{uuid.uuid4().hex}.csv"

        try:
            bucket = st_client.bucket(GCS_bucket_name)
            blob = bucket.blob(blob_name)
            print(f"[LOAD INFO ℹ️] Subiendo CSV temporal a gs://{GCS_bucket_name}/{blob_name} ...", flush=True)
            blob.upload_from_filename(tmp_local)

            table_id = f"{GBQ_project_id}.{GBQ_dataset_id}.{GBQ_table_id}"
            job_config = bigquery.LoadJobConfig(
                write_disposition=disposition,
                source_format=bigquery.SourceFormat.CSV,
                autodetect=True,
                field_delimiter=",",
                quote_character='"',
                allow_quoted_newlines=True,
                labels={"source": "hubspot", "pipeline": "hs_sensitive", "mode": "batch"}
            )
            print(f"[LOAD START ▶️] Cargando en BigQuery => {table_id} ...", flush=True)
            load_job = bq_client.load_table_from_uri(
                f"gs://{GCS_bucket_name}/{blob_name}",
                table_id,
                job_config=job_config,
            )
            load_job.result()
            rows_loaded = len(df)
            print(f"[LOAD SUCCESS ✅] Carga completada ({rows_loaded} filas).", flush=True)
            return rows_loaded
        finally:
            # Limpieza robusta
            try:
                if 'blob' in locals():
                    blob.delete()
            except Exception:
                print("[LOAD WARNING ⚠️] No se pudo eliminar el blob temporal en GCS.", flush=True)
            try:
                if os.path.exists(tmp_local):
                    os.remove(tmp_local)
            except Exception:
                print("[LOAD WARNING ⚠️] No se pudo eliminar el CSV temporal local.", flush=True)

    # --------------------------- LOOP PRINCIPAL POR CHUNKS ------------------------------------------------------
    total_rows, total_chunks = 0, 0
    t0 = time.time()
    current_start = from_date
    print(f"[PROCESSING INFO ℹ️] Ventana: {from_date} → {to_date}", flush=True)
    print(f"[PROCESSING INFO ℹ️] NO sensibles: {hs_fields_no_sensitive}", flush=True)
    print(f"[PROCESSING INFO ℹ️] Sensibles: {hs_fields_sensitive}", flush=True)

    while current_start <= to_date:
        total_chunks += 1
        print(f"--- Chunk #{total_chunks} | desde {current_start.isoformat()}Z ---", flush=True)

        rows, last_dt, reached = _hubspot_search_chunk(current_start, to_date, limit_chunk=10_000)
        if not rows:
            print("[PROCESSING INFO ℹ️] Sin resultados en este tramo; fin del proceso.", flush=True)
            break

        print(f"[PROCESSING INFO ℹ️] {len(rows)} contactos base. Enriqueciendo sensibles...", flush=True)
        df_chunk = _hubspot_batch_sensitive(rows)

        # Primera carga: TRUNCATE. Siguientes: APPEND.
        disposition = "WRITE_TRUNCATE" if total_chunks == 1 else "WRITE_APPEND"
        total_rows += _upload_df_to_bq(df_chunk, disposition=disposition)

        if not last_dt:
            print("[PROCESSING WARNING ⚠️] 'last createdate' indeterminado; se detiene paginación.", flush=True)
            break
        # Avanza 1 segundo tras el último 'createdate' paginado
        current_start = last_dt + timedelta(seconds=1)

        if not reached:
            print("[PROCESSING INFO ℹ️] No se alcanzó el tope de 10k; no hay más contactos en rango.", flush=True)
            break

    # --------------------------- MÉTRICAS FINALES ---------------------------------------------------------------
    elapsed = round(time.time() - t0, 2)
    print("🔹🔹🔹 [METRICS 📊] Resumen de ejecución 🔹🔹🔹", flush=True)  # :contentReference[oaicite:9]{index=9}
    print(f"[METRICS INFO ℹ️] Proyecto: {GBQ_project_id}", flush=True)
    print(f"[METRICS INFO ℹ️] Dataset.Tabla: {GBQ_dataset_id}.{GBQ_table_id}", flush=True)
    print(f"[METRICS INFO ℹ️] Chunks procesados: {total_chunks}", flush=True)
    print(f"[METRICS INFO ℹ️] Filas cargadas: {total_rows}", flush=True)
    print(f"[END FINISHED ✅] Tiempo total: {elapsed} s", flush=True)


# EJECUCIONES

In [33]:
# @title IMPORTACIÓN DATOS SENSIBLES HS TO GBQ
config = {
    "ini_environment_identificated": ini_environment_identificated,
    "json_keyfile_local": GCP_json_keyfile_local,
    "json_keyfile_colab": GCP_json_keyfile_colab,
    "json_keyfile_GCP_secret_id": GCP_json_keyfile_GCP_secret_id,

    "GBQ_project_id": "animum-dev-datawarehouse",
    "GCS_bucket_name": "temp_datawarehouse",
    "GBQ_dataset_id": "tp_02st_01",
    "GBQ_table_id": "hs_contact_sensitive_cleaned",

    "HS_api_key": HS_api_key_str,
    "HS_fields_no_sensitive_names_list": ["email"],
    "HS_fields_sensitive_names_list": ["iban","codigo_bic_swift","documento_nacional_de_identidad_numero"],
    "HS_api_lines_per_call": 100,
    "hs_contact_filter_createdate": {"from": "2025-01-01", "to": "2025-08-29", "mode": "between"}
}

HS_to_GBQ_sensitive_data(config)

🔹🔹🔹 [START ▶️] HS_to_GBQ_sensitive_data (extracción y carga) 🔹🔹🔹
[AUTHENTICATION START ▶️] Inicializando credenciales...
[AUTHENTICATION SUCCESS ✅] Credenciales listas.
[PROCESSING INFO ℹ️] Ventana: 2025-01-01 00:00:00+00:00 → 2025-08-29 23:59:59+00:00
[PROCESSING INFO ℹ️] NO sensibles: ['createdate', 'email', 'id']
[PROCESSING INFO ℹ️] Sensibles: ['codigo_bic_swift', 'documento_nacional_de_identidad_numero', 'iban']
--- Chunk #1 | desde 2025-01-01T00:00:00+00:00Z ---
[PROCESSING INFO ℹ️] 6768 contactos base. Enriqueciendo sensibles...
[PROCESSING INFO ℹ️] Recuperando 3 propiedades sensibles para 6768 IDs en 68 lotes.


HTTPError: /batch/read fallo tras 5 reintentos: {"status":"error","message":"This app hasn't been granted all required scopes to make this call. Read more about required scopes here: https://developers.hubspot.com/scopes.","correlationId":"d8f2dacb-6f75-40e4-958e-8b46812ba7dc","errors":[{"message":"One or more of the following scopes are required.","context":{"requiredGranularScopes":["crm.objects.contacts.sensitive.read.v2","crm.objects.contacts.highly_sensitive.read.v2"]}}],"links":{"scopes":"https://developers.hubspot.com/scopes"},"category":"MISSING_SCOPES"}