Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion packages/agent/src/netdriver_agent/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,23 @@ async def exec_cmd_in_vsys_and_mode(self, command: str, vsys: str = None, mode:
line = lines[i].strip()
output += await self.exec_cmd(line)
i += 1
return output
return output

async def switch_vsys_by_mode(self, command: str, mode: Mode = None) -> str:
"""
Switch VSYS by mode and output
"""
output = ""
if mode and self._mode != mode:
output += await self.switch_mode(mode)
lines = command.splitlines()
line_size = len(lines)
i = 0
while i < line_size:
line = lines[i].strip()
output += await self.exec_cmd(line)
i += 1
return output

async def send_cmd(self, command: str, vsys: str = None, mode: Mode = None,
timeout: float = 10, catch_error: bool = True, detail_output: bool = True) -> CmdTaskResult:
Expand Down
7 changes: 2 additions & 5 deletions packages/agent/src/netdriver_agent/plugins/array/array_ag.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,15 @@ async def switch_vsys(self, vsys: str) -> str:
self._logger.info(f"Switching vsys: {self._vsys} -> {vsys}")

output = ""
# Already in the target vsys
if vsys == self._vsys:
return output

ret : str
if vsys == ArrayBase._DEFAULT_VSYS:
# Switch to default vsys
ret = await self.exec_cmd_in_vsys_and_mode("exit", mode=Mode.ENABLE)
ret = await self.switch_vsys_by_mode("exit", mode=Mode.ENABLE)
output += ret
else:
# Switch to target vsys
ret = await self.exec_cmd_in_vsys_and_mode(f"switch {vsys}", mode=Mode.ENABLE)
ret = await self.switch_vsys_by_mode(f"switch {vsys}", mode=Mode.ENABLE)
output += ret

# Check if there is any error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,19 @@ async def switch_vsys(self, vsys: str) -> str:
self._logger.info(f"Switching vsys: {self._vsys} -> {vsys}")

output = ""
# Already in the target vsys
if vsys == self._vsys:
return output

ret: str
if vsys == self._DEFAULT_VSYS:
# vsys -> default
ret = await self.exec_cmd_in_vsys_and_mode("end", mode=Mode.ENABLE)
ret = await self.switch_vsys_by_mode("end", mode=Mode.ENABLE)
output += ret
elif self._vsys == self._DEFAULT_VSYS:
# default -> vsys
ret = await self.exec_cmd_in_vsys_and_mode(f"config vdom\nedit {vsys}", mode=Mode.ENABLE)
ret = await self.switch_vsys_by_mode(f"config vdom\nedit {vsys}", mode=Mode.ENABLE)
output += ret
else:
# vsys1 -> vsys2
ret = await self.exec_cmd_in_vsys_and_mode(f"next\nedit {vsys}", mode=Mode.ENABLE)
ret = await self.switch_vsys_by_mode(f"next\nedit {vsys}", mode=Mode.ENABLE)
output += ret

# check errors
Expand Down
16 changes: 3 additions & 13 deletions packages/agent/src/netdriver_agent/plugins/huawei/huawei_usg.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,17 @@ def decide_current_vsys(self, prompt: str):
async def switch_vsys(self, vsys: str) -> str:
self._logger.info(f"Switching vsys: {self._vsys} -> {vsys}")

output = ""
# Already in the target vsys
if vsys == self._vsys:
return output

ret: str
if vsys == self._DEFAULT_VSYS:
ret = await self.exec_cmd_in_vsys_and_mode("quit", mode=Mode.ENABLE)
output += ret
else:
ret = await self.exec_cmd_in_vsys_and_mode(f"switch vsys {vsys}", mode=Mode.CONFIG)
output += ret
output = await self.switch_vsys_by_mode(f"switch vsys {vsys}", mode=Mode.CONFIG)

# check errors
err = utils.regex.catch_error_of_output(ret,
err = utils.regex.catch_error_of_output(output,
self.get_error_patterns(),
self.get_ignore_error_patterns())
if err:
self._logger.error(f"Switch vsys failed: {err}")
raise SwitchVsysFailed(err, output=output)

self._vsys = vsys
self._mode = Mode.ENABLE
self._logger.info(f"Switched vsys to: {self._vsys}")
return output
2 changes: 1 addition & 1 deletion packages/agent/src/netdriver_agent/plugins/maipu/maipu.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_error_patterns(self) -> list[re.Pattern]:
def get_ignore_error_patterns(self) -> list[re.Pattern]:
return MaiPuBase.PatternHelper.get_ignore_error_patterns()

def get_enable_password_pattern(self) -> re.Pattern:
def get_enable_password_prompt_pattern(self) -> re.Pattern:
return MaiPuBase.PatternHelper.get_enable_password_prompt_pattern()

def get_mode_prompt_patterns(self) -> dict[Mode, re.Pattern]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_error_patterns(self) -> list[re.Pattern]:
def get_ignore_error_patterns(self) -> list[re.Pattern]:
return VenustechBase.PatternHelper.get_ignore_error_patterns()

def get_enable_password_pattern(self) -> re.Pattern:
def get_enable_password_prompt_pattern(self) -> re.Pattern:
return VenustechBase.PatternHelper.get_enable_password_prompt_pattern()

def get_more_pattern(self) -> tuple[re.Pattern, str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,71 @@ def __init__(self, process: SSHServerProcess, conf_path: str = None):
conf_path = f"{cwd_path}/fortinet_fortigate.yml"
self.conf_path = conf_path
super().__init__(process)
self.level = []
self.level_type = []

@property
def prompt(self) -> str:
prompt = self.config.hostname
if self.level:
prompt = f'{prompt} ({self.level[-1]})'
return prompt + self.config.modes[self._mode].prompt

def switch_level(self, command: str) -> bool:
if command.startswith('config'):
commands = command.split(' ')
self.level_type.append(commands[0])
self.level.append(commands[-1])
elif command.startswith('edit'):
if self.level_type and self.level_type[-1] == 'config':
commands = command.split(' ')
self.level_type.append(commands[0])
self.level.append(commands[-1])
else:
return False
elif command == 'end':
if self.level_type:
is_edit = False
if self.level_type[-1] == 'edit':
is_edit = True
self.level.pop()
self.level_type.pop()
if is_edit:
self.level.pop()
self.level_type.pop()
else:
return False
elif command == 'next':
if self.level_type and self.level_type[-1] == 'edit':
self.level.pop()
self.level_type.pop()
else:
return False
elif command == 'exit' and self.level:
return False
return True

def exec_cmd_in_mode(self, command: str) -> str:
""" Execute command in current mode """
self._logger.info(f"Exec [{command} in {self._mode}]")
if command in self.config.modes[self._mode].cmd_map:
if not self.switch_level(command):
return self.config.invalid_cmd_error
return self.config.modes[self._mode].cmd_map[command]
elif command in self.config.common_cmd_map:
if not self.switch_level(command):
return self.config.invalid_cmd_error
return self.config.common_cmd_map[command]
else:
return self.config.invalid_cmd_error

async def switch_vsys(self, command: str) -> bool:
return False

async def switch_mode(self, command: str) -> bool:
if command not in self.config.modes[self._mode].switch_mode_cmds:
return False

match self._mode:
case Mode.ENABLE:
if command == "exit":
if command == "exit" and not self.level:
# logout
raise ClientExit
case _:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,28 @@ line_feed: "\r\n"
modes:
enable:
prompt: " # "
switch_mode_cmds:
- "exit"
switch_mode_cmds: []
cmds:
- cmd: "config global"
output: ""
- cmd: "config system console"
output: ""
- cmd: "set output standard"
output: ""
- cmd: "end"
output: ""
- cmd: "config vdom"
output: ""
- cmd: "edit root"
output: ""
- cmd: "next"
output: ""
- cmd: "config firewall policy"
output: ""
- cmd: "edit 1"
output: ""
- cmd: "config firewall address"
output: ""
common:
- cmd: "show full-configuration"
output: |
Expand Down
63 changes: 63 additions & 0 deletions tests/integration/test_fortinet_fortigate.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,66 @@ async def test_pull_config(test_client: TestClient, fortinet_fortigate_dev: dict
assert response.headers.get("x-correlation-id") == trace_id
assert not response.json().get("err_msg")
assert len(response.json().get("result")) == 1


@pytest.mark.integration
@pytest.mark.asyncio
async def test_switch_vsys(test_client: TestClient, fortinet_fortigate_dev: dict):
trace_id = uuid4().hex
response = test_client.post("/api/v1/cmd", headers={"x-correlation-id": trace_id}, json={
"protocol": fortinet_fortigate_dev.get("protocol"),
"ip": fortinet_fortigate_dev.get("ip"),
"port": fortinet_fortigate_dev.get("port"),
"username": fortinet_fortigate_dev.get("username"),
"password": fortinet_fortigate_dev.get("password"),
"enable_password": fortinet_fortigate_dev.get("enable_password"),
"vendor": "fortinet",
"model": "fortigate",
"version": "7.2",
"encode": "utf-8",
"vsys": "default",
"commands": [
{
"type": "raw",
"mode": "enable",
"command": "show full-configuration",
"template": ""
}
],
"timeout": 10
})

assert response.status_code == 200
assert response.json().get("code") == "OK"
assert response.headers.get("x-correlation-id") == trace_id
assert not response.json().get("err_msg")
assert len(response.json().get("result")) == 1

response = test_client.post("/api/v1/cmd", headers={"x-correlation-id": trace_id}, json={
"protocol": fortinet_fortigate_dev.get("protocol"),
"ip": fortinet_fortigate_dev.get("ip"),
"port": fortinet_fortigate_dev.get("port"),
"username": fortinet_fortigate_dev.get("username"),
"password": fortinet_fortigate_dev.get("password"),
"enable_password": fortinet_fortigate_dev.get("enable_password"),
"vendor": "fortinet",
"model": "fortigate",
"version": "7.2",
"encode": "utf-8",
"vsys": "root",
"commands": [
{
"type": "raw",
"mode": "enable",
"command": "show full-configuration",
"template": ""
}
],
"timeout": 10
})

assert response.status_code == 200
assert response.json().get("code") == "OK"
assert response.headers.get("x-correlation-id") == trace_id
assert not response.json().get("err_msg")
assert len(response.json().get("result")) == 1
Loading