In [192]:
import json
import networkx as nx
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
import matplotlib.pyplot as plt
import time

Funciones a llamar:

In [193]:

def plot_dag(dag):
    """
    Grafica el DAG de manera estructurada con pygraphviz.
    """
    # Crear un grafo de pygraphviz
    ag = nx.nx_agraph.to_agraph(dag)

    # Etiquetar nodos con el schemaId
    for node in dag.nodes:
        ag.get_node(node).attr['label'] = dag.nodes[node]['schemaId']

    # Configurar dirección del flujo
    ag.graph_attr.update(rankdir='LR', splines='true', overlap='false')

    # Dibujar el grafo
    plt.figure(figsize=(12, 8))
    ag.draw('/tmp/dag_output.png', prog='dot')  # Usa dot para una disposición estructurada
    img = plt.imread('/tmp/dag_output.png')
    plt.imshow(img)
    plt.axis('off')
    plt.show()


## Funciones de bloques de Safe:
def alarm_manager(inputs):
    alarm_selection = inputs["alarm-selection"]
    print(f'ejecutado alarm_manager con alarma: {alarm_selection}')
    return f"Output of alarm_manager with alarm={alarm_selection}"


def notification_on_site(inputs):
    notification_type = inputs.get("notification-type", "default")
    print(f'ejecutado notification_on_site con tipo: {notification_type}')
    return f"Output of notification_on_site with type={notification_type}"

def filter_node(inputs):
    input_detections = inputs["input-detections"]
    filter_type = inputs["input-filter-type"]
    operator = inputs["input-operator"]
    value = inputs["input-value"]
    
    print(f'ejecutado filter_node con tipo: {filter_type}, operador: {operator}, valor: {value}')
    dets_true = []
    dets_false = []
    for det in input_detections:
        if det['object'] == value:  # Asumiendo que queremos filtrar por tipo de objeto
            dets_true.append(det)
        else:
            dets_false.append(det)
    return {"output-filtered-detections-pass": dets_true, "output-filtered-detections-fail": dets_false}

def ai_node(inputs):
    """
    aqui pediria detecciones a la api para una camara dada
    """
    try:
        with open('fake_dets/dets_response.json', 'r') as f:
            detections_dict = json.load(f)
    except FileNotFoundError:
        print("Error: Could not find detections file")
        return {"camera_detections": []}
    except json.JSONDecodeError:
        print("Error: Invalid JSON format in detections file")
        return {"camera_detections": []}
    
    camera_name = inputs["camera-name"]
    print(f'ejecutado ai_node para camara: {camera_name}')
    try:
        return {"camera_detections": detections_dict['detections']['cameras'][camera_name]}
    except KeyError:
        print(f'No hay camara con nombre {camera_name}')
        return {"camera_detections": []}

def count_node(inputs):
    detections = inputs["input-detections"]
    threshold = int(inputs["input-threshold"])
    comparator = inputs["input-comparator"]
    
    count = len(detections)
    print(f'ejecutado count_node con {count} detecciones, comparador {comparator}, threshold {threshold}')
    
    if comparator == ">":
        if count > threshold:
            return {"output-alarm-trigger": detections, "trigger-type": True}
        else:
            return {"output-alarm-trigger": detections, "trigger-type": False}

    elif comparator == "<":
        if  count < threshold:
            return {"output-alarm-trigger": detections, "trigger-type": True}
        else:
            return {"output-alarm-trigger": detections, "trigger-type": False}
    elif comparator == ">=":
        if count >= threshold:
            return {"output-alarm-trigger": detections, "trigger-type": True}
        else:
            return {"output-alarm-trigger": detections, "trigger-type": False}
    elif comparator == "<=":
        if count <= threshold:
            return {"output-alarm-trigger": detections, "trigger-type": True}
        else:
            return {"output-alarm-trigger": detections, "trigger-type": False}
    else:  # "=="
        if count == threshold:
            return {"output-alarm-trigger": detections, "trigger-type": True}
        else:
            return {"output-alarm-trigger": detections, "trigger-type": False}

def camera_node(inputs):
    camera_name = inputs["input-camera-name"]
    print(f'ejecutado camera_node con camera: {camera_name}')
    return {'output-image-name':camera_name}

def distance_node(inputs):
    distance_threshold = inputs.get("distance-threshold", 2.0)
    print(f'ejecutado distance_node con threshold: {distance_threshold}')
    return f"Output of distance_node with threshold={distance_threshold}"

def start_cycle_node(inputs):
    print('ejecutado start_cycle_node')
    return True

def notification_off_site(inputs):
    notification_type = inputs["notification-type"]
    telegram_phone = inputs["telegram-phone"]
    telegram_message = inputs["telegram-message"]
    telegram_image = inputs["telegram-image"]
    
    print(f'ejecutado notification_off_site con tipo: {notification_type}, telefono: {telegram_phone}')
    return f"Output of notification_off_site with type={notification_type}"



Build DAG.

In [194]:

def build_dag(json_path: str, plot: bool = False):
    with open(json_path, 'r') as f:
        flow = json.load(f)
    # Crear el grafo
    dag = nx.DiGraph()

    # Almacenar las funciones disponibles
    functions = {
        "alarm-manager": alarm_manager,
        "notification-on-site": notification_on_site,
        "filter-node": filter_node,
        "ai-node": ai_node,
        "count-node": count_node,
        "camera-node": camera_node,
        "notification-off-site": notification_off_site,
        "start-cycle-node": start_cycle_node,
        "distance-node": distance_node
    }

    for block in flow:
        node_id = block['id']
        func_name = block['schemaId']  # El nombre de la función está en 'schemaId'
        func = functions.get(func_name)
        outputs = block.get('outputs', [])
        if not func:
            raise ValueError(f"Función {func_name} no definida en el mapa de funciones")
        inputs = {inp['inputId']: inp['value'] for inp in block.get('inputs', [])}
        outputs = {output['outputId']: None for output in outputs}
        connected_inputs = {input['targetHandle']['nodeId']: {'source':input['source'], 'node_output': input['sourceHandle']['nodeId']} for input in block.get('connectedTo', {}).get('inputs', [])}
        # Agregar el nodo al DAG
        dag.add_node(node_id, 
                     func=func,
                     schemaId=func_name,
                     inputs=inputs,
                     outputs=outputs,
                     connected_inputs=connected_inputs)
        print('func_name', func_name, '\n \n inputs', inputs, '\n \n outputs', outputs, '\n \n conected_inputs: ', connected_inputs)
        print('mmmmmmmmmmmmmmmmmmmmm')
        # Agregar conexiones desde los nodos de entrada
        for connection in block.get('connectedTo', {}).get('inputs', []):
            source_node = connection['source']
            dag.add_edge(source_node, node_id)

        # Agregar conexiones hacia los nodos de salida
        for connection in block.get('connectedTo', {}).get('outputs', []):
            target_node = connection['target']
            dag.add_edge(node_id, target_node)

    # Verificar que sea un DAG válido
    if not nx.is_directed_acyclic_graph(dag):
        raise ValueError("El flujo no es un DAG válido")
    if plot:
        plot_dag(dag)
    return dag

# Usage




In [195]:
FILE_JSON = "flows/workflow_edit.json"

In [196]:
my_dag = build_dag(FILE_JSON, plot = False)

func_name alarm-manager 
 
 inputs {'alarm-selection': 'alarma_fuego', 'cooldown': '3', 'sensitivity': '23'} 
 
 outputs {} 
 
 conected_inputs:  {'alarm-selection': {'source': 'WVOSg_nGK-pNCveO409pC', 'node_output': 'output-alarm-trigger'}}
mmmmmmmmmmmmmmmmmmmmm
func_name filter-node 
 
 inputs {'input-detections': 'ai-node-3-camera_detections', 'input-filter-type': 'clase', 'input-operator': '==', 'input-value': 'person'} 
 
 outputs {'output-filtered-detections-pass': None, 'output-filtered-detections-fail': None} 
 
 conected_inputs:  {'input-detections': {'source': 'HmqswRo8MT9u3sDogYiiC', 'node_output': 'camera_detections'}}
mmmmmmmmmmmmmmmmmmmmm
func_name ai-node 
 
 inputs {'camera-name': 'camera-node-2-output-image-name'} 
 
 outputs {'camera_detections': None} 
 
 conected_inputs:  {'camera-name': {'source': 'yhtLPhFYEDOkfObJlH9L7', 'node_output': 'output-image-name'}}
mmmmmmmmmmmmmmmmmmmmm
func_name notification-off-site 
 
 inputs {'notification-type': 'safe', 'telegram-pho

In [197]:
# in_degrees = dict(my_dag.in_degree())
# ready_nodes = [node for node, degree in in_degrees.items() if degree == 0]

In [198]:
def get_node_inputs(my_dag, node_id):
    node_data = my_dag.nodes[node_id]
    connected_inputs = node_data['connected_inputs']
    inputs = node_data['inputs']
    for k, connected_input in connected_inputs.items():
        if k not in inputs.keys():
            continue
        source_node = connected_input['source']
        source_output = connected_input['node_output']
        antecesor_node_data = my_dag.nodes[source_node]
        print('k', k)
        print('source_node', source_node)
        print('source_output',source_output)
        print("antecesor_node_data['outputs']", antecesor_node_data['outputs'])
        output_value = antecesor_node_data['outputs'][source_output]
        inputs[k] = output_value
    return inputs

def update_outputs(my_dag, node_id, results):
    node_data = my_dag.nodes[node_id]
    outputs = node_data['outputs']
    print(results)
    for k, output in outputs.items():
        outputs[k] = results[k]
        print('outputs actualizados', outputs)
    return my_dag

def process_node(my_dag, node_id):
    """
    Procesa un nodo ejecutando su función con todos sus inputs
    """
    print('--------------------------------')
    node_data = my_dag.nodes[node_id]
    func = node_data['func']

    print(f"Function name: {func.__name__}")
    inputs = get_node_inputs(my_dag, node_id)

    print(f"All inputs: {inputs}")
    
    # time.sleep(0.5)
    
    # # Ejecutar la función con el diccionario de inputs
    results = func(inputs=inputs)
    # print(f'Results for {node_id}:', results)
    my_dag = update_outputs(my_dag, node_id, results)
    # print(f'Results for {node_id}:', results)

In [199]:

# node_data = my_dag.nodes['yhtLPhFYEDOkfObJlH9L7']
# print(node_data)

# node_data = my_dag.nodes['FyyVHNJZeLUMIH3Es-h_k']
# print(node_data)

# node_data = my_dag.nodes['HmqswRo8MT9u3sDogYiiC']
# print(node_data)
# process_node(my_dag, 'FyyVHNJZeLUMIH3Es-h_k')
process_node(my_dag, "yhtLPhFYEDOkfObJlH9L7")
process_node(my_dag, "HmqswRo8MT9u3sDogYiiC")
process_node(my_dag, "1X5AYvGcQ6IscN7g1QQMI")
process_node(my_dag, "WVOSg_nGK-pNCveO409pC")
# process_node(my_dag, "18uQmbgHK_bSJeoOk4t-N")
# process_node(my_dag, "LGGNrUGwfH0hBALCp5lop")



--------------------------------
Function name: camera_node
All inputs: {'input-camera-name': 'almacen2'}
ejecutado camera_node con camera: almacen2
{'output-image-name': 'almacen2'}
outputs actualizados {'output-image-name': 'almacen2'}
--------------------------------
Function name: ai_node
k camera-name
source_node yhtLPhFYEDOkfObJlH9L7
source_output output-image-name
antecesor_node_data['outputs'] {'output-image-name': 'almacen2'}
All inputs: {'camera-name': 'almacen2'}
ejecutado ai_node para camara: almacen2
{'camera_detections': [{'object': 'person', 'confidence': 0.9171023964881897, 'id_tracking': 7, 'bbox': [0.5441161394119263, 0.5410915017127991, 0.6872852444648743, 0.9145181179046631], 'keypoints': None, 'contours': None, 'coord': [-7.224, 40.07], 'speed': [0.3586715, -12.65756]}, {'object': 'perro', 'confidence': 0.9171023964881897, 'id_tracking': 7, 'bbox': [0.5441161394119263, 0.5410915017127991, 0.6872852444648743, 0.9145181179046631], 'keypoints': None, 'contours': None,

In [200]:

def get_node_inputs(node_id, node_data):
    """
    Obtiene todos los inputs del nodo, tanto valores directos como conexiones
    """
    # Crear diccionario con los valores directos de los inputs
    # input_values = {
    #     inp['inputId']: inp['value'] 
    #     for inp in node_data.get('inputs', [])
    # }
    print('input_values', node_data['params'])
    # Buscar conexiones de entrada y actualizar valores
    for predecessor in my_dag.predecessors(node_id):
        pred_data = my_dag.nodes[predecessor]
        connections = [
            conn for conn in pred_data.get('connectedTo', {}).get('outputs', [])
            if conn['target'] == node_id
        ]
        
        for connection in connections:
            source_node = connection['source']
            source_handle = connection['sourceHandle']['nodeId']
            target_handle = connection['targetHandle']['nodeId']
            
            if source_node in results:
                # Manejar múltiples outputs (como en filter_node)
                if isinstance(results[source_node], tuple):
                    if source_handle == 'output-filtered-detections-pass':
                        input_values[target_handle] = results[source_node][0]
                    elif source_handle == 'output-filtered-detections-fail':
                        input_values[target_handle] = results[source_node][1]
                else:
                    input_values[target_handle] = results[source_node]
    
    return input_values

def process_node(node_id):
    """
    Procesa un nodo ejecutando su función con todos sus inputs
    """
    node_data = my_dag.nodes[node_id]
    func = node_data['func']
    # inputs = node_data['params']
    print(node_data['outputs'])
    # Obtener todos los inputs como un diccionario
    

    
    inputs = get_node_inputs(node_id, node_data)
    print('--------------------------------')
    print(f"Function name: {func.__name__}")
    print(f"All inputs: {inputs}")
    
    time.sleep(0.5)
    
    # Ejecutar la función con el diccionario de inputs
    results[node_id] = func(inputs=inputs)
    print(f'Results for {node_id}:', results[node_id])

# Crear un ejecutor para manejar la concurrencia
with ThreadPoolExecutor() as executor:
    # Inicializar la cola con nodos sin predecesores
    ready_nodes = [node for node, degree in in_degrees.items() if degree == 0]
    futures = {}

    while ready_nodes or futures:
        # Lanzar tareas para nodos listos
        for node_id in ready_nodes:
            futures[node_id] = executor.submit(process_node, node_id)

        ready_nodes = []

        # Esperar a que se completen algunas tareas
        for node_id, future in list(futures.items()):
            if future.done():
                future.result()  # Propagar excepciones si ocurrieron
                futures.pop(node_id)

                # Reducir el in-degree de los sucesores y agregar a la lista de listos
                for successor in my_dag.successors(node_id):
                    in_degrees[successor] -= 1
                    if in_degrees[successor] == 0:
                        ready_nodes.append(successor)


NameError: name 'in_degrees' is not defined