testing

In [1]:
import re
import json
import mysql.connector
import socket
import threading
import time

def receive_data(xml_message):
    """
    Parses an incoming XML-like message and extracts tags and data.
    Returns a dictionary with 'tags' and 'data'.
    """
    tag_pattern = r'"tag": "([^"]+)"'
    data_pattern = r'"data": \{([^}]+)\}'

    tags_match = re.search(tag_pattern, xml_message)
    data_match = re.search(data_pattern, xml_message)

    tags = [tag.strip() for tag in tags_match.group(1).split(',')] if tags_match else []
    data = [datum.strip().strip("'\"") for datum in data_match.group(1).split(',')] if data_match else []

    parsed_data = {
        "tags": tags,
        "data": data
    }

    print("Step 1: Parsed Data ->", json.dumps(parsed_data, indent=4))
    return parsed_data

def store_instant_solution(parsed_data, instant_solution, db_connection):
    """
    Stores parsed data in the instant_solution array.
    Merges messages with existing tag_ids instead of duplicating entries.
    """
    cursor = db_connection.cursor()
    updated_solution = instant_solution.copy()

    for tag in parsed_data["tags"]:
        # Fetch tag_id from Tags Table
        cursor.execute("SELECT tag_id FROM tag_info WHERE tag_name = %s", (tag,))
        tag_id_result = cursor.fetchone()

        if tag_id_result:
            tag_id = tag_id_result[0]
            data_string = ", ".join(parsed_data["data"])

            # Check if tag_id already exists in instant_solution
            for entry in updated_solution:
                if entry[0] == tag_id:
                    entry[1] += 1  # Increment message count
                    entry.append(data_string)  # Append new data
                    break
            else:
                updated_solution.append([tag_id, 1, data_string])

    cursor.close()
    print("Step 2: Updated Instant Solution ->", updated_solution)
    return updated_solution

def send_data(port, data):
    """
    Sends data to the respective agent using TCP socket with timeout.
    Logs connection errors without raising an exception.
    """
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            # Set a timeout for connection attempt
            s.settimeout(5)
            s.connect(("localhost", port))
            s.sendall(data.encode())
            print(f"Sent data to port {port}: {data}")
    except (socket.timeout, ConnectionRefusedError) as e:
        print(f"Connection error to port {port}: {e}")
        # Log the error but don't stop the entire process
        return False
    except Exception as e:
        print(f"Unexpected error sending data to port {port}: {e}")
        return False
    return True

def forward_messages(instant_solution, db_connection):
    """
    Iterates through instant_solution, finds agents, sends messages,
    and updates the array after sending.
    """
    cursor = db_connection.cursor()
    
    # Create a copy of the list to safely iterate and modify
    for entry in instant_solution[:]:  # Iterate over a copy to avoid modification issues
        tag_id = entry[0]
        message_count = entry[1]

        # Fetch all agent_ids assigned to this tag_id
        cursor.execute("SELECT agent_id FROM agent_tags WHERE tag_id = %s", (tag_id,))
        agent_id_results = cursor.fetchall()
        print(f"Step 2.1: Agents assigned to tag {tag_id} -> {agent_id_results}")

        for agent_id_result in agent_id_results:
            agent_id = agent_id_result[0]

            # Fetch agent's port_number
            cursor.execute("SELECT port_number FROM agent_info WHERE agent_id = %s", (agent_id,))
            port_result = cursor.fetchone()

            if port_result:
                port_number = port_result[0]
                print(f"Step 2.2: Agent {agent_id} assigned to tag {tag_id} has port {port_number}")

                # Send first data message from list
                data_to_send = entry[2]

                # Send data and check if successful
                if send_data(port_number, data_to_send):
                    # Reduce count and remove message only after all agents receive it
                    message_count -= 1

        # If all messages for a tag are sent, remove the entry
        if message_count <= 0:
            instant_solution.remove(entry)

    cursor.close()
    print("Step 3: Instant Solution after Processing ->", instant_solution)
    return instant_solution


def mock_agent_server(port):
    """
    Mock agent server to simulate receiving messages.
    Useful for testing without actual agent implementations.
    """
    def start_server():
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # Allow address reuse
            s.bind(('localhost', port))
            s.listen(1)
            print(f"Mock server listening on port {port}")
            
            while True:
                try:
                    conn, addr = s.accept()
                    with conn:
                        print(f"Connected by {addr}")
                        data = conn.recv(1024)
                        print(f"Received on port {port}: {data.decode()}")
                except Exception as e:
                    print(f"Error in mock server on port {port}: {e}")
                    break

    # Start server in a separate thread
    server_thread = threading.Thread(target=start_server, daemon=True)
    server_thread.start()


def main():
    # Establish database connection
    db_connection = mysql.connector.connect(
        host="localhost",
        user="root",
        password="",
        database="agent_flow",
        port='3307'
    )

    # Start mock agent servers for testing
    mock_agent_server(5000)  # Agent A
    mock_agent_server(5001)  # Agent C
    mock_agent_server(5002)  # Agent C
    time.sleep(1)  # Give servers time to start

    # Sample input message
    input_message = '''<message>
{
    <tags>
    "tag": "Machine Learning,Data Processing"
    </tags>
    <data>
    "data": {"input1", "input2"}
    </data>
}
</message>'''

    # Initialize instant solution list
    instant_solution = []

    try:
        # Parse incoming message
        parsed_data = receive_data(input_message)

        # Store instant solution
        instant_solution = store_instant_solution(parsed_data, instant_solution, db_connection)

        # Forward messages
        forward_messages(instant_solution, db_connection)

    except Exception as e:
        print(f"An error occurred: {e}")

    finally:
        # Close database connection
        db_connection.close()

# Uncomment and configure for actual use
if __name__ == "__main__":
    main()

Mock server listening on port 5000
Mock server listening on port 5001
Mock server listening on port 5002
Step 1: Parsed Data -> {
    "tags": [
        "Machine Learning",
        "Data Processing"
    ],
    "data": [
        "input1",
        "input2"
    ]
}
Step 2: Updated Instant Solution -> [[1, 1, 'input1, input2'], [2, 1, 'input1, input2']]
Step 2.1: Agents assigned to tag 1 -> [(1,), (3,)]
Step 2.2: Agent 1 assigned to tag 1 has port 5000
Connected by ('127.0.0.1', 54888)
Sent data to port 5000: input1, input2
Received on port 5000: input1, input2
Step 2.2: Agent 3 assigned to tag 1 has port 5002
Connected by ('127.0.0.1', 54889)
Sent data to port 5002: input1, input2
Received on port 5002: input1, input2
Step 2.1: Agents assigned to tag 2 -> [(1,), (2,)]
Step 2.2: Agent 1 assigned to tag 2 has port 5000
Connected by ('127.0.0.1', 54890)
Sent data to port 5000: input1, input2
Received on port 5000: input1, input2
Step 2.2: Agent 2 assigned to tag 2 has port 5001
Connected by (