In [2]:
import os
from netmiko import ConnectHandler
from netmiko.exceptions import NetmikoAuthenticationException, NetmikoTimeoutException

# --- 1. Define connection parameters ---
# Replace with your actual VPCS connection information
# Assume VPCS can be accessed via 192.168.1.3 port 5000 in GNS3
device = {
    # Try 'generic_telnet'
    "device_type": "generic_telnet", 
    
    # Replace with VPCS IP or GNS3 access IP
    "host": "192.168.1.3", 
    
    # Replace with Telnet port. In GNS3, this is usually the device console port
    "port": 5012, 
    
    # VPCS usually doesn't have username and password, but some GNS3 environments may require it.
    # If your VPCS doesn't have credentials, you can comment out or set to empty strings
    "username": "", 
    "password": "", 
    
    # VPCS may need time to startup, set longer connection timeout
    "timeout": 30,

}

# Your regular expression PC1> ...PC2> .... PC10>
PROMPT_REGEX = r"PC\d+>"

# --- 2. Command line execution ---
#test_command = "ip 10.0.0.2/24 10.0.0.1"  # VPCS command example
test_command = "ping 10.0.0.1"
try:
    print(f"Trying to connect using {device['device_type']} to {device['host']}:{device['port']}...")
    
    # Establish connection
    net_connect = ConnectHandler(**device)
    current_prompt = net_connect.find_prompt().strip()

    print(f"Device prompt identified as: { current_prompt}")

    print("✅ Connection successful!")
    
    # Send command
    # Note: VPCS CLI characteristics may require using send_command_timing
    # But we'll start with standard send_command first
    output = net_connect.send_command_timing(
        test_command,
        read_timeout=15, # 增加命令的等待时间
        delay_factor=2, # 针对此命令增加延迟
        #expect_string=PROMPT_REGEX
        )
    
    print(f"\n--- {test_command} output ---")
    print(output)
    print("------------------------\n")
    
    # Disconnect connection
    net_connect.disconnect()

except NetmikoAuthenticationException:
    print("❌ Authentication failed! Please check username and password (if VPCS requires it).")
except NetmikoTimeoutException:
    print(f"❌ Connection timeout! Please check if IP, port {device['port']} are correct and device is started.")
except Exception as e:
    print(f"❌ Other error occurred: {e}")

Trying to connect using generic_telnet to 192.168.1.3:5012...
Device prompt identified as: PC1>
✅ Connection successful!

--- ping 10.0.0.1 output ---

------------------------



In [16]:
# use telnetlib3
from telnetlib3 import Telnet
from time import sleep

host = "192.168.1.3"
port = 5012
command = "ip 10.10.0.11/24 10.10.0.254"
tn = Telnet()

tn.open(host = host, port = port, timeout = 30)

tn.write(b"\n")
sleep(0.5)
tn.write(b"\n")
sleep(0.5)
tn.write(b"\n")
sleep(0.5)
tn.write(b"\n")
sleep(0.5)

tn.expect([br"PC\d+>"])

tn.write(command.encode(encoding='ascii') + b"\n")
sleep(5)

tn.expect([br"PC\d+>"])

result = tn.read_very_eager().decode('utf-8')

print(result)
tn.close()


 
PC1> 
PC1> ip 10.10.0.11/24 10.10.0.254
Checking for duplicate address...
PC1 : 10.10.0.11 255.255.255.0 gateway 10.10.0.254

PC1> 


In [25]:
# use telnetlib3 threading
import threading
import json
from telnetlib3 import Telnet
from time import sleep

def connect_and_execute(host, port, command, results_dict, device_name):
    """simple device connect and execute"""
    tn = Telnet()

    try:
        tn.open(
            host = host,
            port = port,
            timeout = 30
        )

        tn.write(b"\n")
        sleep(0.5)
        tn.write(b"\n")
        sleep(0.5)
        tn.write(b"\n")
        sleep(0.5)
        tn.write(b"\n")
        sleep(0.5)

        tn.expect([br"PC\d+>"])

        tn.write(command.encode(encoding='ascii') + b"\n")
        sleep(5)

        tn.expect([br"PC\d+>"])

        result = tn.read_very_eager().decode('utf-8')

        results_dict[device_name] = {
            "status": "success",
            "device": f"{host}:{port}",
            "command":command,
            "output": result
        }

    except Exception as e:
        results_dict[device_name] = {
            "status": "error",
            "device": f"{host}:{port}",
            "command": command,
            "error": str(e)
        }
    
    finally:
        tn.close()

devices = {
    "PC1": ("192.168.1.3", 5012, "ip 10.10.0.12/24 10.10.0.254"),
    "PC2": ("192.168.1.3", 5014, "ip 10.10.0.13/24 10.10.0.254"),
    "PC3": ("192.168.1.3", 5016, "ip 10.20.0.12/24 10.20.0.254"),
    "PC4": ("192.168.1.3", 5018, "ip 10.20.0.13/24 10.20.0.254"),
}

# result dict
results = {}
# create thread pod
threads = []

for device_name, (host, port, command) in devices.items():
    thread = threading.Thread(
        target= connect_and_execute, 
        args=(
            host,
            port,
            command,
            results,
            device_name
            )
    )
    threads.append(thread)
    thread.start()

# wait all thread done
for thread in threads:
    thread.join()

# print results
print("==== results ====")
print(
    json.dumps(
        results,
        indent=2,
        ensure_ascii=False
    )
)

==== results ====
{
  "PC3": {
    "status": "success",
    "device": "192.168.1.3:5016",
    "command": "ip 10.20.0.12/24 10.20.0.254",
    "output": " \r\n\rPC3> \r\n\rPC3> ip 10.20.0.12/24 10.20.0.254\r\nChecking for duplicate address...\r\nPC3 : 10.20.0.12 255.255.255.0 gateway 10.20.0.254\r\n\r\n\rPC3> "
  },
  "PC1": {
    "status": "success",
    "device": "192.168.1.3:5012",
    "command": "ip 10.10.0.12/24 10.10.0.254",
    "output": " \r\n\rPC1> \r\n\rPC1> ip 10.10.0.12/24 10.10.0.254\r\nChecking for duplicate address...\r\nPC1 : 10.10.0.12 255.255.255.0 gateway 10.10.0.254\r\n\r\n\rPC1> "
  },
  "PC4": {
    "status": "success",
    "device": "192.168.1.3:5018",
    "command": "ip 10.20.0.13/24 10.20.0.254",
    "output": " \r\n\rPC4> \r\n\rPC4> ip 10.20.0.13/24 10.20.0.254\r\nChecking for duplicate address...\r\nPC4 : 10.20.0.13 255.255.255.0 gateway 10.20.0.254\r\n\r\n\rPC4> "
  },
  "PC2": {
    "status": "success",
    "device": "192.168.1.3:5014",
    "command": "ip 10.

In [5]:
"""
Multi-device VPCS command execution tool using telnetlib3 with threading.
Supports concurrent execution of multiple command groups across multiple VPCS devices.
"""
import json
import threading
from typing import List, Dict, Any
from telnetlib3 import Telnet
from time import sleep
from langchain.tools import BaseTool
from public_model.get_gns3_device_port import get_device_ports_from_topology
from log_config import setup_tool_logger

logger = setup_tool_logger("vpcs_multi_commands")

class VPCSMultiCommands(BaseTool):
    """
    A tool to execute multiple command groups across multiple VPCS devices concurrently.
    Supports parallel execution with threading for improved performance.
    
    Input should be a JSON array containing command group objects.
    Example input:
        [
            {
                "device_name": "PC1",
                "commands": ["ip 10.10.0.12/24 10.10.0.254", "ping 10.10.0.254"]
            },
            {
                "device_name": "PC2", 
                "commands": ["ip 10.10.0.13/24 10.10.0.254"]
            }
        ]
    
    Returns a list of results, one for each command group.
    """
    
    name: str = "execute_vpcs_multi_commands"
    description: str = """
    Executes multiple command groups across multiple VPCS devices concurrently using telnetlib3.
    Supports parallel execution with threading for improved performance.
    
    Input should be a JSON array containing command group objects with device_name and commands.
    Example input:
        [
            {
                "device_name": "PC1",
                "commands": ["ip 10.10.0.12/24 10.10.0.254", "ping 10.10.0.254"]
            },
            {
                "device_name": "PC2", 
                "commands": ["ip 10.10.0.13/24 10.10.0.254"]
            }
        ]
    
    Returns a list of results, each containing device_name, status, output, and commands.
    """

    def _connect_and_execute_commands(self, device_name, commands, results_list, index, device_ports, gns3_host):
        """连接设备并执行多个命令的内部方法"""
        
        # 检查设备是否存在端口信息
        if device_name not in device_ports:
            results_list[index] = {
                "device_name": device_name,
                "status": "error",
                "output": f"Device '{device_name}' not found in topology or missing console port",
                "commands": commands
            }
            return
        
        port = device_ports[device_name]["port"]
        host = gns3_host
        
        tn = Telnet()
        try:
            tn.open(host=host, port=port, timeout=30)
            
            # 初始化连接
            tn.write(b"\n")
            sleep(0.5)
            tn.write(b"\n") 
            sleep(0.5)
            tn.write(b"\n")
            sleep(0.5)
            tn.write(b"\n")
            sleep(0.5)
            tn.expect([br"PC\d+>"])
            
            # 执行所有命令并合并输出
            combined_output = ""
            for command in commands:
                tn.write(command.encode(encoding='ascii') + b"\n")
                sleep(5)
                tn.expect([br"PC\d+>"])
                output = tn.read_very_eager().decode('utf-8')
                combined_output += output
                
            # 添加结果到列表
            results_list[index] = {
                "device_name": device_name,
                "status": "success",
                "output": combined_output,
                "commands": commands
            }
            
        except Exception as e:
            results_list[index] = {
                "device_name": device_name,
                "status": "error", 
                "output": str(e),
                "commands": commands
            }
        finally:
            tn.close()

    def _run(self, tool_input: str, run_manager=None) -> List[Dict[str, Any]]:
        """执行多设备多命令的主要方法"""
    
        try:
            command_groups = json.loads(tool_input)
            if not isinstance(command_groups, list):
                return [{"error": "Input must be a JSON array of command group objects"}]
        except json.JSONDecodeError as e:
            logger.error("Invalid JSON input: %s", e)
            return [{"error": f"Invalid JSON input: {e}"}]

        # 从输入中提取所有设备名称
        device_names = list(set([cmd_group["device_name"] for cmd_group in command_groups]))

        # 获取设备端口映射
        device_ports = get_device_ports_from_topology(device_names)

        # 从环境变量获取主机IP
        gns3_host = os.getenv("GNS3_SERVER_HOST", "127.0.0.1")

        # 初始化结果列表（预分配空间以支持并发写入）
        results = [None] * len(command_groups)
        threads = []

        # 为每个命令组创建线程
        for i, cmd_group in enumerate(command_groups):
            thread = threading.Thread(
                target=self._connect_and_execute_commands,
                args=(
                    cmd_group["device_name"],
                    cmd_group["commands"], 
                    results,
                    i,
                    device_ports,
                    gns3_host
                )
            )
            threads.append(thread)
            thread.start()

        # 等待所有线程完成
        for thread in threads:
            thread.join()

        #logger.info("Multi-device command execution completed. Results: %s", 
        #            json.dumps(results, indent=2, ensure_ascii=False))

        return results

if __name__ == "__main__":
    # Example usage
    command_groups = [
        {
            "device_name": "PC1",
            "commands": ["ip 10.10.0.12/24 10.10.0.254", "ping 10.10.0.254"]
        },
        {
            "device_name": "PC2", 
            "commands": ["ip 10.10.0.13/24 10.10.0.254"]
        },
        {
            "device_name": "PC3",
            "commands": ["ip 10.20.0.22/24 10.20.0.254", "ping 10.20.0.254"]
        },
        {
            "device_name": "PC4", 
            "commands": ["ip 10.20.0.23/24 10.20.0.254"]
        }
    ]

    exe_cmd = VPCSMultiCommands()
    result = exe_cmd._run(tool_input=json.dumps(command_groups))
    print("Execution results:")
    print(json.dumps(result, indent=2, ensure_ascii=False))


Execution results:
[
  {
    "device_name": "PC1",
    "status": "success",
    "output": " \r\n\rPC1> \r\n\rPC1> ip 10.10.0.12/24 10.10.0.254\r\nChecking for duplicate address...\r\nPC1 : 10.10.0.12 255.255.255.0 gateway 10.10.0.254\r\n\r\n\rPC1>  ",
    "commands": [
      "ip 10.10.0.12/24 10.10.0.254",
      "ping 10.10.0.254"
    ]
  },
  {
    "device_name": "PC2",
    "status": "success",
    "output": " \r\n\rPC2> \r\n\rPC2> ip 10.10.0.13/24 10.10.0.254\r\nChecking for duplicate address...\r\nPC2 : 10.10.0.13 255.255.255.0 gateway 10.10.0.254\r\n\r\n\rPC2> ",
    "commands": [
      "ip 10.10.0.13/24 10.10.0.254"
    ]
  },
  {
    "device_name": "PC3",
    "status": "success",
    "output": " \r\n\rPC3> \r\n\rPC3> ip 10.20.0.22/24 10.20.0.254\r\nChecking for duplicate address...\r\nPC3 : 10.20.0.22 255.255.255.0 gateway 10.20.0.254\r\n\r\n\rPC3>  ",
    "commands": [
      "ip 10.20.0.22/24 10.20.0.254",
      "ping 10.20.0.254"
    ]
  },
  {
    "device_name": "PC4",
    "st