In [None]:
import json
import os
import requests
from dotenv import load_dotenv
from time import time
# quitar los warnigngs de las peticiones
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [None]:
# leer el archivo .env
load_dotenv()

In [None]:
USER = os.getenv("USER")
PASSWORD = os.getenv("PASSWORD")

In [None]:
def get_token(url_nifi_api: str, access_payload: dict,verify=False) -> str:
    """
    Retrieves a JWT token by authenticating the user, makes
    use of the REST API `/access/token`.

    :param url_nifi_api: the basic URL to the NiFi API.
    :param access_payload: dictionary with keys 'username' & 'password' and
                           fitting values.
    :param verify: whether to verify the SSL certificate.
    :return: JWT Token
    """

    header = {
        "Accept-Encoding": "gzip, deflate, br",
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "*/*",
    }
    response = requests.post(
        url_nifi_api + "access/token", headers=header, data=access_payload, verify = verify
    )
    return response.content.decode("ascii")

In [None]:
def get_processor(url_nifi_api: str, processor_id: str, token=None, verify=False):
    """
    Gets and returns a single processor.

    Makes use of the REST API `/processors/{processor_id}`.

    :param url_nifi_api: String
    :param processor_id: String
    :param token: JWT access token
    :param verify: whether to verify the SSL certificate.
    :returns: JSON object processor
    """

    # Authorization header
    header = {
        "Content-Type": "application/json",
        "Authorization": "Bearer {}".format(token),
    }

    # GET processor and parse to JSON
    response = requests.get(url_nifi_api + f"processors/{processor_id}", headers=header, verify=verify)
    return json.loads(response.content)

In [None]:
def update_processor_status(processor_id: str, new_state: str, token, url_nifi_api,verify=False):
    """Starts or stops a processor by retrieving the processor to get
    the current revision and finally putting a JSON with the desired
    state towards the API.

    :param processor_id: Id of the processor to receive the new state.
    :param new_state: String representing the new state, acceptable
                      values are: STOPPED or RUNNING.
    :param token: a JWT access token for NiFi.
    :param url_nifi_api: URL to the NiFi API
    :param verify: whether to verify the SSL certificate.
    :return: None
    """

    # Retrieve processor from `/processors/{processor_id}`
    processor = get_processor(url_nifi_api, processor_id, token)

    # Create a JSON with the new state and the processor's revision
    put_dict = {
        "revision": processor["revision"],
        "state": new_state,
        "disconnectedNodeAcknowledged": True,
    }

    # Dump JSON and POST processor
    payload = json.dumps(put_dict).encode("utf8")
    header = {
        "Content-Type": "application/json",
        "Authorization": "Bearer {}".format(token),
    }
    response = requests.put(
        url_nifi_api + f"processors/{processor_id}/run-status",
        headers=header,
        data=payload,
        verify=verify
    )
    return response

In [None]:
def get_processor_state(url_nifi_api: str, processor_id: str, token=None, verify=False):
    """
    Gets and returns a single processor state.

    Makes use of the REST API 'processors/{processor_id}/state'.

    :param url_nifi_api: String
    :param processor_id: String
    :param token: JWT access token
    :param verify: whether to verify the SSL certificate.
    :returns: JSON object processor's state
    """

    # Authorization header
    if token is None:
        header = {"Content-Type": "application/json"}
    else:
        header = {
            "Content-Type": "application/json",
            "Authorization": "Bearer {}".format(token),
        }

    # GET processor and parse to JSON
    response = requests.get(
        url_nifi_api + f"processors/{processor_id}/state", headers=header, verify=verify
    )
    return json.loads(response.content)

In [None]:
def clear_processor_state(url_nifi_api: str, processor_id: str, token, verify=False):
    """
    Clears the state of a processor using the REST API.

    Makes use of the REST API 'processors/{processor_id}/state/clear-requests'.

    :param url_nifi_api: String - Base URL to NiFi API
    :param processor_id: String - ID of the processor
    :param token: JWT access token
    :param verify: whether to verify the SSL certificate.
    :returns: Response object
    """
    header = {
        "Content-Type": "application/json",
        "Authorization": "Bearer {}".format(token),
    }

    # POST to clear state
    response = requests.post(
        url_nifi_api + f"processors/{processor_id}/state/clear-requests", 
        headers=header, 
        verify=verify
    )
    return response

In [None]:
def parse_state(json_obj, state_key: str):
    """
    Retrieves the value of a state by the key of the state out of the JSON.

    :param json_obj: the processor's general state.
    :param state_key: the key for the specific state.
    :raises ValueError: if the passed key cannot be found in the processor state.
    :return: value of the matching key.
    """
    states = json_obj["componentState"]["localState"]["state"]
    for state in states:
        if state["key"] == state_key:
            value = state["value"]
            return value
    raise ValueError(f"Could not find {state_key} ")

In [None]:
def pause(secs):
    init_time = time()
    while time() < init_time + secs:
        pass

In [None]:
def get_counters(url_nifi_api: str, token=None, verify=False):
    """
    Obtiene todos los contadores de NiFi usando la API REST.

    Utiliza la API REST `/counters`.

    :param url_nifi_api: URL base de la API de NiFi (ej: "http://localhost:8080/nifi-api/")
    :param token: Token de acceso JWT (opcional si no hay autenticación)
    :param verify: si se debe verificar el certificado SSL
    :returns: Objeto JSON con todos los contadores
    """
    
    header = {
            "Content-Type": "application/json",
            "Authorization": "Bearer {}".format(token),
        }

    # Solicita los contadores y los convierte a JSON
    response = requests.get(
        url_nifi_api + "counters", 
        headers=header, 
        verify=verify
    )
    
    if response.status_code == 200:
        return json.loads(response.content)
    else:
        raise Exception(f"Error al obtener contadores: {response.status_code} - {response.text}")

def get_counter_value_by_id(counters_response: dict, counter_id: str):
    """
    Obtiene el valor de un contador específico por su ID desde la respuesta JSON.

    :param counters_response: Respuesta JSON completa de la API de contadores
    :param counter_id: ID del contador a buscar
    :returns: Valor del contador o None si no se encuentra
    """
    
    # Verificar si tiene aggregateSnapshot (estructura más común)
    if "aggregateSnapshot" in counters_response["counters"]:
        for counter in counters_response["counters"]["aggregateSnapshot"]["counters"]:
            if counter["id"] == counter_id:
                return counter["value"]
    
    # Verificar si tiene nodeSnapshots (estructura alternativa)
    elif "nodeSnapshots" in counters_response["counters"]:
        for node_snapshot in counters_response["counters"]["nodeSnapshots"]:
            for counter in node_snapshot["snapshot"]["counters"]:
                if counter["id"] == counter_id:
                    return counter["value"]
    
    return None

def reset_counter(url_nifi_api: str, counter_id: str, token, verify=False):
    """
    Resetea un contador específico a 0.

    :param url_nifi_api: URL base de la API de NiFi
    :param counter_id: ID del contador a resetear
    :param token: Token de acceso JWT
    :param verify: si se debe verificar el certificado SSL
    :returns: Respuesta de la API
    """
    
    header = {
        "Content-Type": "application/json",
        "Authorization": "Bearer {}".format(token),
    }
    
    reset_payload = {
        "counter": {
            "value": 0
        }
    }
    
    response = requests.put(
        url_nifi_api + f"counters/{counter_id}",
        headers=header,
        data=json.dumps(reset_payload),
        verify=verify
    )
    
    return response

In [None]:
access_payload = {
        "username": USER,
        "password": PASSWORD,
    }  # e.g. retrieve via Airflow's `BaseHook` functionality
url_nifi_api = "https://192.9.200.15:8443/nifi-api/"  # e.g. retrieve via Airflow's `BaseHook` functionality

In [None]:
token = get_token(url_nifi_api, access_payload, verify=False)

In [None]:
todos_contadores = get_counters(url_nifi_api, token, verify=False)
todos_contadores

In [None]:
# Obtener los contadores
todos_contadores = get_counters(url_nifi_api, token, verify=False)

# Extraer tuplas (id, name)
counter_configs = [(contador['id'], contador['name']) for contador in todos_contadores['counters']['aggregateSnapshot']['counters']]

print(f"Total de contadores: {len(counter_configs)}")

In [None]:
def prepare_counter(*counter_configs, **kwargs):
    """ Prepara los contadores de NiFi para el flujo de trabajo.
    @DAVILA 24-07-2025 - Mejorado con nombres descriptivos
    Esta función reinicia los contadores especificados para que estén listos para el flujo de trabajo.
    :param *counter_configs: Tuplas de (id_counter, name) o diccionarios con las configuraciones.
    :param **kwargs: Para compatibilidad con llamadas usando counter_configs=
    :return: None
    """
    # Si se pasa como keyword argument
    if 'counter_configs' in kwargs:
        if isinstance(kwargs['counter_configs'], list):
            configs_to_process = kwargs['counter_configs']
        else:
            configs_to_process = [kwargs['counter_configs']]
    else:
        configs_to_process = counter_configs
    
    # Normalizar las configuraciones a diccionarios
    normalized_configs = []
    for i, config in enumerate(configs_to_process):
        if isinstance(config, tuple) and len(config) == 2:
            normalized_configs.append({
                'id_counter': config[0],
                'name': config[1]
            })
        elif isinstance(config, dict):
            name = config.get('name', f'Counter_{i+1}')
            normalized_configs.append({
                'id_counter': config['id_counter'],
                'name': name
            })
        elif isinstance(config, str):  # Solo ID, generar nombre automático
            normalized_configs.append({
                'id_counter': config,
                'name': f'Counter_{i+1}'
            })
        else:
            raise ValueError(f"Invalid counter configuration: {config}. Expected tuple (id_counter, name), dict, or string.")
    
    print(f"Preparing {len(normalized_configs)} counter(s) for workflow...")
    
    for config in normalized_configs:
        id_counter = config['id_counter']
        name = config['name']
        
        # Reiniciar el contador
        reset_counters = reset_counter(url_nifi_api, id_counter, token, verify=False)
        print(f"{reset_counters.status_code} - {reset_counters.reason} - {name} counter ({id_counter}) reset")
    
    print("All counters have been prepared successfully!")

In [None]:
# resteaer TODOS LOS CONTADORES
#prepare_counter(*counter_configs)

In [None]:
procesador = '19cea901-ddc2-3ce6-c087-9d6e0e5bf4d2'

In [None]:
#run_once = update_processor_status(procesador, "RUN_ONCE", token, url_nifi_api)

In [None]:
#run_once.status_code, run_once.text