Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System transport session.read() stuck indefinitely #233

Closed
netixx opened this issue May 4, 2022 · 17 comments
Closed

System transport session.read() stuck indefinitely #233

netixx opened this issue May 4, 2022 · 17 comments
Labels
bug Something isn't working

Comments

@netixx
Copy link

netixx commented May 4, 2022

Describe the bug
The system transport code with the device seems to get stuck indefinitely. This seems impossible as this is wrapped with @TransportTimeout. However we could get this stuck for at least 30 minutes, which means that the exception does not fire.

I think this has to do with the use of scrapli in threads, because I couldn't reproduce it with a simple script.

To Reproduce

We are using nornir-scrapli, getting config via netconf for around 60 devices.
For around 1 out of 3 runs, one of the threads get stuck on the self.fileobj.read1(size).
The behavior seems quite random (we couldn't single out a host).

The full setup is as follows:

  • RQ worker process monitors a redis queue
  • Upon consuming a new task, the worker forks a child process
  • The tasks consists of a nornir call, that spawns Thread using a ThreadPool (we have a setup a limit of 10)
  • Each of the threads starts a ssh process
  • We see that one of the threads is blocked on the read1 call because we dump the frames when the worker kills the task (after a 5 min timeout)

We only have Cisco IOS-XR platforms at the moment (but I don't think it's relevant, as this seems a low-level issue).

Expected behavior
A timeout exception should have been raised by the call to session.read taking too long (more that 30s by default).

Stack Trace

File: "/usr/local/lib/python3.9/site-packages/scrapli/transport/plugins/system/transport.py", line 158, in read
  buf = self.session.read(65535)
File: "/usr/local/lib/python3.9/site-packages/scrapli/transport/plugins/system/ptyprocess.py", line 488, in read
  s = self.fileobj.read1(size)

**OS **

  • OS: Debian (container), MacOS
  • scrapli version: 2022.1.30
@netixx netixx added the bug Something isn't working label May 4, 2022
@carlmontanari
Copy link
Owner

Hey @netixx thanks for opening this!

Are you able to share more of the frame dump stuff? I want to try to see which timeout flavor is being ran. Iit should be multiprocessing timeout... since we cant use signals from not the main thread (also it should always be multiprocessing timeout with system transport), but possible we have a bug in the selection of the timeout flavor basically.

Also just for sanity sake, can you confirm that transport_timeout is something non-zero? If set to zero we just skip the transport timeout wrapper entirely (as a way to spawn less threads/have less overhead). In this case I suspect that wouldn't matter because unless you are directly using conn.transport.read you should be having things also wrapped by the channel operation timeout wrapper... but just trying to eliminate any obvious issues.

How difficult would it be to share a minimum reproducible compose file? That would be pretty awesome to help hunt this down if it is at all possible!

Totally unrelated, but, out of curiosity, why bother with nornir at all if you already have the means/setup to spawn more processes?

Thanks again!

Carl

@carlmontanari
Copy link
Owner

Almost forgot to add -- I would bet ssh2/paramiko will work in this scenario... and if you are running this in containers its likely that all the benefits/things I like about system transport are probably pretty pointless anyway (since you probably are not putting weird openssh things like proxy jump/control persist/custom kex/cipher/etc. things in your container). So... I definitely want to try to find the root cause and fix it, but it may be worth giving one of the other transports a shot to see if that sorts things as well!

@netixx
Copy link
Author

netixx commented May 5, 2022

Hello,

Here is the full frame dump I gathered:

Caught 14, dumping frames

# Thread: ThreadPoolExecutor-6196_0(140270611986176)
File: "/usr/local/lib/python3.9/threading.py", line 930, in _bootstrap
  self._bootstrap_inner()
File: "/usr/local/lib/python3.9/threading.py", line 973, in _bootstrap_inner
  self.run()
File: "/usr/local/lib/python3.9/threading.py", line 910, in run
  self._target(*self._args, **self._kwargs)
File: "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 83, in _worker
  work_item.run()
File: "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 58, in run
  result = self.fn(*self.args, **self.kwargs)
File: "/usr/local/lib/python3.9/site-packages/scrapli/transport/plugins/system/transport.py", line 158, in read
  buf = self.session.read(65535)
File: "/usr/local/lib/python3.9/site-packages/scrapli/transport/plugins/system/ptyprocess.py", line 488, in read
  s = self.fileobj.read1(size)

# Thread: ThreadPoolExecutor-0_1(140272069904128)
File: "/usr/local/lib/python3.9/threading.py", line 930, in _bootstrap
  self._bootstrap_inner()
File: "/usr/local/lib/python3.9/threading.py", line 973, in _bootstrap_inner
  self.run()
File: "/usr/local/lib/python3.9/threading.py", line 910, in run
  self._target(*self._args, **self._kwargs)
File: "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 83, in _worker
  work_item.run()
File: "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 58, in run
  result = self.fn(*self.args, **self.kwargs)
File: "/usr/local/lib/python3.9/site-packages/nornir/core/task.py", line 99, in start
  r = self.task(self, **self.params)
File: "/comet/comet/tasks/get_configuration.py", line 22, in get_configuration
  response = task.run(
File: "/usr/local/lib/python3.9/site-packages/nornir/core/task.py", line 169, in run
  r = run_task.start(self.host)
File: "/usr/local/lib/python3.9/site-packages/nornir/core/task.py", line 99, in start
  r = self.task(self, **self.params)
File: "/usr/local/lib/python3.9/site-packages/nornir_scrapli/tasks/netconf/get_config.py", line 32, in netconf_get_config
  scrapli_response = scrapli_conn.get_config(
File: "/usr/local/lib/python3.9/site-packages/scrapli_netconf/driver/sync_driver.py", line 175, in get_config
  raw_response = self.channel.send_input_netconf(response.channel_input)
File: "/usr/local/lib/python3.9/site-packages/scrapli_netconf/channel/sync_channel.py", line 285, in send_input_netconf
  buf = self._read_until_prompt(buf=b"")
File: "/usr/local/lib/python3.9/site-packages/scrapli/channel/sync_channel.py", line 137, in _read_until_prompt
  read_buf.write(self.read())
File: "/usr/local/lib/python3.9/site-packages/scrapli/channel/sync_channel.py", line 69, in read
  buf = self.transport.read()
File: "/usr/local/lib/python3.9/site-packages/scrapli/decorators.py", line 99, in decorate
  return self._multiprocessing_timeout(
File: "/usr/local/lib/python3.9/site-packages/scrapli/decorators.py", line 176, in _multiprocessing_timeout
  self._handle_timeout()
File: "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 637, in __exit__
  self.shutdown(wait=True)
File: "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 235, in shutdown
  t.join()
File: "/usr/local/lib/python3.9/threading.py", line 1053, in join
  self._wait_for_tstate_lock()
File: "/usr/local/lib/python3.9/threading.py", line 1073, in _wait_for_tstate_lock
  if lock.acquire(block, timeout):

# Thread: MainThread(140272200296256)
File: "/comet/comet/api/worker.py", line 22, in <module>
  worker.work()
File: "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 606, in work
  self.execute_job(job, queue)
File: "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 866, in execute_job
  self.fork_work_horse(job, queue)
File: "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 775, in fork_work_horse
  self.main_work_horse(job, queue)
File: "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 901, in main_work_horse
  self.perform_job(job, queue)
File: "/usr/local/lib/python3.9/site-packages/rq/worker.py", line 1061, in perform_job
  rv = job.perform()
File: "/usr/local/lib/python3.9/site-packages/rq/job.py", line 821, in perform
  self._result = self._execute()
File: "/usr/local/lib/python3.9/site-packages/rq/job.py", line 844, in _execute
  result = self.func(*self.args, **self.kwargs)
File: "/comet/comet/api/app.py", line 62, in comet_run
  result = run(args)
File: "/comet/comet/run.py", line 81, in run
  result["aggregated_result"] = nornir_engine.run(
File: "/usr/local/lib/python3.9/site-packages/nornir/core/__init__.py", line 139, in run
  result = self.runner.run(task, run_on)
File: "/usr/local/lib/python3.9/site-packages/nornir/plugins/runners/__init__.py", line 40, in run
  futures.append(future)
File: "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 637, in __exit__
  self.shutdown(wait=True)
File: "/usr/local/lib/python3.9/concurrent/futures/thread.py", line 235, in shutdown
  t.join()
File: "/usr/local/lib/python3.9/threading.py", line 1053, in join
  self._wait_for_tstate_lock()
File: "/usr/local/lib/python3.9/threading.py", line 1073, in _wait_for_tstate_lock
  if lock.acquire(block, timeout): 

I can confirm that transport_timeout is non 0, I am using default values (of 30.0 checked with a debugger breakpoint).

We are using fancy ssh feature (we connect through a special proxy - basically it only forwards stdin/stdout to/from the devices - https://github.com/ovh/the-bastion) - this is one of the reasons we use scrapli.

The basic principle of the app is as follows: a user submits a check on one or more devices in the network in the frontend, the worker consumes the tasks and performs the check, returning the result to the frontend via a queue.

Nornir helps us streamline our code by taking care of :

  • it handles the connection to the device inventory (netbox)
  • it handles the parallelism of the connections to the devices

Here is an MVP script :

#!/usr/bin/env python3

from nornir_scrapli.tasks import netconf_get_config
from nornir import InitNornir
from nornir.core.plugins.inventory import TransformFunctionRegister
from nornir.core.inventory import Host, ConnectionOptions
from typing import Any
import os

import threading, sys, traceback, signal # pylint: disable=import-outside-toplevel,multiple-imports

# FOR DEBUG: dump the stacktrace when we receive the SIGALARM signal
# from RQ worker
# default_handler = signal.getsignal(signal.SIGALRM)

def dumpstacks(sig, frame):
    # pylint: disable = E, W, R, C
    print("Caught {}, dumping frames".format(sig))
    sys.stdout.flush()
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            if name == "dumpstacks":
                # ignore this code in the stacktrace
                continue
            code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    print("\n".join(code))
    sys.stdout.flush()
    # default_handler(sig, frame)

signal.signal(signal.SIGALRM, dumpstacks)

args = {
  "netbox_url": "<url>",
  "netbox_tenant": "<tenant>",
  "netbox_token": "<token>",
  "filter_hostname": [],
}

config = {
  "runner": {
      "plugin": "threaded",
      "options": {
          "num_workers": 30, # 30: around 800M peak
      },
  },
  "core": {
      "raise_on_error": False,
  },
  # "logging": {
  #     "enabled": False,
  # },
  "inventory": {
      "plugin": "NetBoxInventory2",
      "options": {
          "nb_url": args["netbox_url"],
          "nb_token": args["netbox_token"],
          "use_platform_slug": True,
          "filter_parameters": {
              "name": args["filter_hostname"],
              "tenant": [
                  args["netbox_tenant"],
              ],
              # forcing implemented platforms in staticaly here
              "platform": [
                  "cisco-ios-xr",
              ],
              "status": "active",
          },
      },
  },
}

task = netconf_get_config

print("forking")
# fork for consistency with RQ, but is not required to reproduce
pid = os.fork()

if pid == 0:
  print("child process running")
  # run task in the child process
  with InitNornir(**config) as nornir_engine:
      print("Running nornir engine")
      # run the tasks
      nornir_engine.run(
          task=task,
      )
  print("nornir done")
else:
  os.waitpid(pid, 0)
print("done")

@carlmontanari
Copy link
Owner

Awesome, thanks for all the detail! I’ll try to poke around with this over the next few days to see if I can reproduce and figure out what’s going on!

@carlmontanari
Copy link
Owner

Would it be possible to get logs from one of the failing devices as well? (scrapli logs I mean). Curious to see what that looks like.

Spent a bit of time poking around (and trying to re-remember how things work in the timeouts!). I think I was able to mostly recreate this by patching the read1 method and making it block. If that read blocks the thread we run the read in (that the decorator spawns) can never finish, and we have no way of externally killing it (afaik).

So, in my testing I would make that block for say 10s but have a 2s timeout value -- after 10s the "read1" thing would unblock. Then the decorator can finally raise the exception/close the connection.

I'm not 100% sure how/where that read1 method can get stuck blocking in real life (logs may help with that?). If I connect to an XR box via netconf (just to be as similar as possible to your issue -- I imagine as you said that this happens regardless of the target device), and set a timeout to lower than what the response should take to return to me the decorator kills the connection which causes read1 to not be able to read and things work as you'd expect. So not 100% sure I've got all this sorted in my head yet but wanted to comment to keep ya in the loop, for posterity, and to ask about those logs.

Carl

@netixx
Copy link
Author

netixx commented May 6, 2022

Just to make sure I understand:
the issue is that because we a wrapping the call in a thread to monitor for timeout, and that there is no real way to kill a thread in python, the program will deadlock when the read call is blocking ?

Shouldn't we run the 'monitoring' loop in a dedicated subprocess then (this would need an overhaul, because we cannot spawn a process for each call to read()!) ? This way we can kill the process if it gets stuck ?
Also I see that when in the main thread, we use signals. Couldn't something similar we achieved (with subprocesses or other) ?

Other idea, in the case of SystemTransport (which I think is also the cause of this issue), we can kill the underlying SSH process when the timeout fires - this should release the read() call ?

I gathered the scrapli logs by adding the thread-id to the logging formatter and matching that to the thread that is stuck:

2022-05-06 14:47:41,008 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - attempting in channel netconf authentication
2022-05-06 14:47:41,172 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b"Warning: Permanently added '########' (ECDSA) to the list of known hosts.\n"
2022-05-06 14:47:41,340 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'######            |\n*------------------------------------------------------------------------------*\n'
2022-05-06 14:47:41,833 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b"#####\n"
2022-05-06 14:47:42,126 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'######.\n'
2022-05-06 14:47:42,153 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'#####'
2022-05-06 14:47:42,153 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - write: REDACTED
2022-05-06 14:47:42,154 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - write: '\n'
2022-05-06 14:47:42,183 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'\n'
2022-05-06 14:47:42,744 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'\n\ndevice banner\n\n\n'
2022-05-06 14:47:42,812 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'Password: \n'
2022-05-06 14:47:42,812 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - write: REDACTED
2022-05-06 14:47:42,813 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - write: '\n'
2022-05-06 14:47:42,865 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'\n'
2022-05-06 14:47:43,152 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - DEBUG - read: b'<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">\n <capabilities>\n  <capability>urn:ietf:params:netconf:base:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:candidate:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:rollback-on-error:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:validate:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:confirmed-commit:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:notification:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:interleave:1.0</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-infra-systemmib-cfg?module=Cisco-IOS-XR-infra-systemmib-cfg&amp;revision=2015-11-09</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-ipv4-autorp-datatypes?module=Cisco-IOS-XR-ipv4-autorp-datatypes&amp;revision=2015-11-09</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-perf-meas-cfg?module=Cisco-IOS-XR-perf-mea'
2022-05-06 14:47:43,154 - 123145901084672 ThreadPoolExecutor-119_0 - scrapli.channel - INFO - found start of server capabilities, authentication successful

It seems that the device stops sending it's capabilities mid-flight!

@carlmontanari
Copy link
Owner

the issue is that because we a wrapping the call in a thread to monitor for timeout, and that there is no real way to kill a thread in python, the program will deadlock when the read call is blocking ?

Yeah, no way to kill the thread for sure -- as far as read actually blocking and being the thing that is causing this, that is my best guess!

Shouldn't we run the 'monitoring' loop in a dedicated subprocess then (this would need an overhaul, because we cannot spawn a process for each call to read()!) ? This way we can kill the process if it gets stuck ?

Maybe?! 😁 If I'm understanding you are you basically saying run the timeout wrapper as a process such that we can kill the process and not have to worry about the thread dead locking us? My initial reaction to this is -- "that would be very costly". In fact, my personal "settings" when I use scrapli is literally to disable the transport timeout entirely as this causes us to spawn many fewer threads which makes it use less cpu and makes things a tiny bit faster. This could potentially also be a short term fix for this situation.... 🤔

Also I see that when in the main thread, we use signals. Couldn't something similar we achieved (with subprocesses or other) ?

I may be misremembering as I messed with all the timeouts quite a while ago, but here is my recollection:

  • cant use signals w/ system transport due to the ptyprocess forking magic (this part of things is sort of magic to me so I dont have a better explanation other than I think I tried it and it did nothing.... eventually found out that the forking made the signals thing not work. I could be very wrong and would be very welcome to being wrong on this!)
  • cant use signals in not the main thread (so nornir use case we cant use signals in any scenario basically)

Other idea, in the case of SystemTransport (which I think is also the cause of this issue), we can kill the underlying SSH process when the timeout fires - this should release the read() call ?

Yep, good thought, but the _handle_timeout method of the timeout decorator already closes the session, which in our case would set the transport session object to None which of course should stop any reading. I was messing around with this yesterday... need to do a bit more experimenting here. I need to re-run but I think basically this did not "kill" the read operation (in my case literally just time.sleep) that was currently active, and thus we still blocked for that duration.

It seems that the device stops sending it's capabilities mid-flight!

Interesting! I wonder if you could correlate that with show netconf-yang clients (or whatever the command is) and/or device logs to see if the device is telling us to go away?

So.... takeaways....

  1. Maybe you could try with transport timeout -> 0 to see if that at least works around this issue. That would hopefully "solve" this, and/or maybe show us any other issues that are present.
  2. See if the device has any relevant information to correlate with this
  3. I'm going to do more testing just ran out of time yesterday. I want to include testing with nornir as it swallows the plugin exceptions and wraps them -- I dont think that would cause any issues but just want to check. Plus I slept between now and last night when I was messing around so I think I already forgot a lot of the things :)

Thanks a bunch for sticking with me on this one -- interesting issue for sure!

Carl

@netixx
Copy link
Author

netixx commented May 9, 2022

A precision regarding _handle_timeout to make what I meant more clear.

I think we should kill the underlying ssh process in this function, before we wait on the threads (which are locked in read).
I did some testing with my setup, when the program is stuck, if I send e.g. SIGTERM to the SSH processes (using htop to identify the ssh processes and kill them), this unlock the deadlock.
Could we access the PID of the process spawned by ptyprocess and kill it in _handle_timeout ? This would also prevent generating zombie SSH processes.

I tested with transport_timeout=0, but I still get the locking issue. The threads are locked on read1 all the same, the only difference is that I don't have a extraneous thread locked on the ThreadPool __exit__ call.

@netixx
Copy link
Author

netixx commented May 9, 2022

I did some more testing/looking around on this issue.

The _handle_timeout function does in fact call close() on the transport, and for SystemTransport, it tries to kill the subprocess:

#ptyprocess.py line 404
def close(self, force: bool = True) -> None:
        """
        Close the instance

        This closes the connection with the child application. Note that
        calling close() more than once is valid. This emulates standard Python
        behavior with files. Set force to True if you want to make sure that
        the child is terminated (SIGKILL is sent if the child ignores SIGHUP
        and SIGINT).

        Args:
            force: bool

        Returns:
            None

        Raises:
            PtyProcessError: if child cannot be terminated

        """
        if not self.closed:
            self.flush()

            # in the original ptyprocess vendor'd code the file object is "gracefully" closed,
            # however in some situations it seemed to hang forever on the close call... given that
            # as soon as this connection is closed it will need to be re-opened, and that will of
            # course re-create the fileobject this seems like an ok workaround because for reasons
            # unknown to me... this does not hang (even though in theory delete method just closes
            # things...?)
            try:
                del self.fileobj
            except AttributeError:
                pass
            # Give kernel time to update process status.
            time.sleep(self.delayafterclose)
            if self.isalive():
                if not self.terminate(force):
                    raise PtyProcessError("Could not terminate the child.")
            self.fd = -1
            self.closed = True

However, upon debugging, I found that self.terminate was never called in my case...

I changed the process pool name in the decorator to with ThreadPoolExecutor(max_workers=1, thread_name_prefix=self.transport_instance.open_cmd[21]) as pool: so that I could use the thread names to see which device was locking (index 21 is linked to my specific open_cmd). Once the program locked, I could see the locked device by looking at thread names in the debugger. If I kill the underlying ssh process (using HTOP or PS+kill), using the device name (which is in the process name), the program unlocks. In this case we get a scrapli.transport - CRITICAL - encountered EOF reading from transport; typically means the device closed the connection as expected (the call to read is interrupted by the OS because the process has been killed, and OSError is raised).

However, upon looking at the logs in details, I could see a log for the decorator timeout for this device (i tuned the log to include the device name as well): transport operation timed out, closing transport.
This means that the _handle_timeout is not fired

@carlmontanari
Copy link
Owner

I tested with transport_timeout=0, but I still get the locking issue. The threads are locked on read1 all the same, the only difference is that I don't have a extraneous thread locked on the ThreadPool exit call.

yeah, this makes sense in hindsight -- we would still be blocking on read1, and the channel timeout wrapper would still apply. I suspect if we set both transport and channel timeouts to 0 we would see it hang and/or have some other issue...

In this case we get a scrapli.transport - CRITICAL - encountered EOF reading from transport; typically means the device closed the connection as expected (the call to read is interrupted by the OS because the process has been killed, and OSError is raised).

OK, tracking so far -- this feels "good" (in that that sounds like what it should be doing!)

However, upon looking at the logs in details, I could see a log for the decorator timeout for this device (i tuned the log to include the device name as well): transport operation timed out, closing transport.
This means that the _handle_timeout is not fired

If you see that log message wouldn't that mean it was fired?

Been a bit of a long day so maybe im not thinking clearly -- but it sounds like the important part of your most recent findings is that terminate is never called, which causes the process to never really get killed (as evidenced by things working when you manually kill it but not in "normal" scrapli behavior). Could it be that the isalive is incorrectly reporting? if we simply skipped that isalive check, and went straight to termination does that fix the issue? If yes, I feel like just skipping that check would be reasonable since we shouldn't really be calling close unless we want to terminate things anyway.

Thanks again for all your work on digging into this!

Carl

@netixx
Copy link
Author

netixx commented May 10, 2022

Sorry I made a typo, I mean I did not see a log for the transport timeout in the case where read is locked (i.e. when looking back at the logs for a specific device which is in a locked state).

To rephrase is a positive sentence: I now think the issue is that the _handle_timeout method is not called.
I seems that the program is stuck somewhere after the submit call (maybe in the wait ?).

I was doing more tests to find out what the state of the future would be (assuming future.done() returns true when locking occurs), but since the issue is not happening consistently, it takes a lot of time... So I made a SSH server that always mimics the device behaviour (by stopping transmission in the middle of the capabilities):

package main

import (
	"io"
	"log"
	"time"

	"github.com/gliderlabs/ssh"
	"github.com/sirupsen/logrus"
)

// base on following logs
// server that mimics the locking process (every host will lock)
// read: b"Warning: Permanently added '########' (ECDSA) to the list of known hosts.\n"
// read: b'######            |\n*------------------------------------------------------------------------------*\n'
// read: b"#####\n"
// read: b'######.\n'
// read: b'#####'
// write: REDACTED
// write: '\n'
// read: b'\n'
// read: b'\n\ndevice banner\n\n\n'
// read: b'Password: \n'
// write: REDACTED
// write: '\n'
// read: b'\n'
// read: b'<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">\n <capabilities>\n  <capability>urn:ietf:params:netconf:base:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:candidate:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:rollback-on-error:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:validate:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:confirmed-commit:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:notification:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:interleave:1.0</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-infra-systemmib-cfg?module=Cisco-IOS-XR-infra-systemmib-cfg&amp;revision=2015-11-09</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-ipv4-autorp-datatypes?module=Cisco-IOS-XR-ipv4-autorp-datatypes&amp;revision=2015-11-09</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-perf-meas-cfg?module=Cisco-IOS-XR-perf-mea'
// ound start of server capabilities, authentication successful

func handler (s ssh.Session) {
	logrus.WithField("susbsystem", s.Subsystem()).Infof("Handle request")
	time.Sleep(100 * time.Millisecond)
	io.WriteString(s, "Banner\n\n")
	io.WriteString(s, `<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">\n <capabilities>\n  <capability>urn:ietf:params:netconf:base:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:candidate:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:rollback-on-error:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:validate:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:confirmed-commit:1.1</capability>\n  <capability>urn:ietf:params:netconf:capability:notification:1.0</capability>\n  <capability>urn:ietf:params:netconf:capability:interleave:1.0</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-infra-systemmib-cfg?module=Cisco-IOS-XR-infra-systemmib-cfg&amp;revision=2015-11-09</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-ipv4-autorp-datatypes?module=Cisco-IOS-XR-ipv4-autorp-datatypes&amp;revision=2015-11-09</capability>\n  <capability>http://cisco.com/ns/yang/Cisco-IOS-XR-perf-meas-cfg?module=Cisco-IOS-XR-perf-mea`)
	time.Sleep(3600 * time.Second)
}

func main() {
	s := &ssh.Server{
    Addr:             ":2222",
		SubsystemHandlers: map[string]ssh.SubsystemHandler{
			"netconf": handler,
		},
	}

	log.Fatal(s.ListenAndServe())

}

Testing with this server (changing the 'host' dynamically in nornir so that the target address is replaced by localhost:2222 for all hosts), I found that I couldn't reproduce the issue when I had a single worker (nornir num_workers=1), but it leads to deadlock when using 2 or more as num_workers. This leads me to believe that there is some kind of race condition in the way the ThreadPoolExecutor or futures are handled (could be in the std python lib for all I know).

My thoughts at this point is: why use a ThreadPoolExecutor for this purpose ? Wouldn't it be simpler to launch a threading.Timer that calls _handle_timeout ?

I had some success with (still needs some ironing out):

import threading
def _multiprocessing_timeout(
        self, wrapped_func: Callable[..., Any], args: Any, kwargs: Any
    ) -> Any:
        """
        Multiprocessing method for timeouts; works in threads and on windows

        Args:
            wrapped_func: function being decorated
            args: function being decorated args
            kwargs: function being decorated kwargs

        Returns:
            Any: result of decorated function

        Raises:
            N/A

        """
        timeout_timer = threading.Timer(self.transport_timeout_transport, self._handle_timeout)
        timeout_timer.start()
        val = wrapped_func(*args, **kwargs)
        timeout_timer.cancel()
        return val

Along with using force=True in the close call (SIGHUP, SIGCONT, SIGINT does cut it to kill the ssh process that is stuck) - but it needs more testing to make sure it works as expected:

self.session.close(force=True)

Thank you also for investigating this issue :)

@carlmontanari
Copy link
Owner

Sorry I made a typo, I mean I did not see a log for the transport timeout in the case where read is locked (i.e. when looking back at the logs for a specific device which is in a locked state).

Ok, cool makes more sense!

Testing with this server (changing the 'host' dynamically in nornir so that the target address is replaced by localhost:2222 for all hosts), I found that I couldn't reproduce the issue when I had a single worker (nornir num_workers=1), but it leads to deadlock when using 2 or more as num_workers. This leads me to believe that there is some kind of race condition in the way the ThreadPoolExecutor or futures are handled (could be in the std python lib for all I know).

I'm able to reproduce without nornir and just core scrapli (no netconf) using pretty much your same setup -- with a single future submitted to a ThreadPoolExecutor it works as we want it to, as soon as there are two futures submitted we block forever.

My thoughts at this point is: why use a ThreadPoolExecutor for this purpose ? Wouldn't it be simpler to launch a threading.Timer that calls _handle_timeout ?

Literally didn't know this existed till you brought it up here -- but it looks nice! Doing a quick bit of playing around with this it does seem like it works better... I see it is raising the ScrapliTimeout exception, but still looks like things are hanging somewhere/somehow. I also added the force=True in system transport and that didn't seem to help much for me at least.

.... many minutes later.... 😁

I think maybe I've found out whats up... I went w/ classes for decorators for.... I'm honestly not 100% sure anymore -- I think it was partly just to break things up nicer but keep things grouped together, and maybe to retain some state about the type of timeout to run or something. Regardless... I'm still not 100% sure how, but what I think is happening is that we are trying to kill the session of the same process. This works of course if there is only one, but as soon as we add the second thread/process for whatever reason we just keep trying to kill the same process twice. You can probably(hopefully!) confirm this by just dropping a print of self.pid in the ptyprocess close method.

The reason for bringing up the class decorators is that I did a super down and dirty version of this as a function and it looks like we do not block (this is without caring about the threading.timeout thing and just leaving the existing way, I don't think that has anything to do with it either way).

you can try decorating the read of system transport with something like this to see if this solves the issue.

    def decorate(*args: Any, **kwargs: Any) -> Any:
        transport_instance = args[0]
        transport_timeout_transport = transport_instance._base_transport_args.timeout_transport

        def _multiprocessing_timeout(
                wrapped_func: Callable[..., Any], args: Any, kwargs: Any
        ) -> Any:
            with ThreadPoolExecutor(max_workers=1) as pool:
                future = pool.submit(wrapped_func, *args, **kwargs)
                wait([future], timeout=transport_timeout_transport)
                if not future.done():
                    transport_instance.logger.critical("transport operation timed out, closing transport")
                    transport_instance.close()
                    raise ScrapliTimeout("blah, it died")

            result = future.result()
            return result

        if not transport_timeout_transport:
            return wrapped_func(*args, **kwargs)

        return _multiprocessing_timeout(
            wrapped_func=wrapped_func,
            args=args,
            kwargs=kwargs,
        )

If that is the case then I'm fine with goin to function decorators but I really want to find out why the class way is being difficult. I dropped some prints right before calling the self.transport_instance.session.close() in the timeout function and printed the id of the transport instance, the session and the actual pid of the ptyprocess object itself and they were always "right" (meaning unique per thread). Yet when we get to .close in ptyprocess it was the same pid somehow.

So... maybe/hopefully there is something we can clean up in ptyprocess to make this problem go away...

Thats all the brain power I have for the moment, will take another look later/tomorrow!

@carlmontanari
Copy link
Owner

The reason for bringing up the class decorators is that I did a super down and dirty version of this as a function and it looks like we do not block (this is without caring about the threading.timeout thing and just leaving the existing way, I don't think that has anything to do with it either way).

I'm like 99.9% sure im going to move to the thread timing thing you showed, if for no other reason than it seems less resource intensive than spawning threads for timeouts (but ill test that theory first of course!). I think in the case of solving this issue its just kind of making it look different but I dont think its actually changing any behavior.

I really want to find out whats up with the decorator bits, but at this point I'm leaning toward just overhauling the decorator(s) and moving them to functions. That feels kind of more "normal" for python anyway so maybe this is just the motivitation to do that. Probably won't look at this again till the weekend, but maybe you'll find something dumb I did and fix it all before then 🤓

@netixx
Copy link
Author

netixx commented May 12, 2022

I suppose the problem lies here in the original setup:

def decorate(*args: Any, **kwargs: Any) -> Any:  # type: ignore
                self.transport_instance = args[0]

Using @TransportTimeout() will create a single instance of the class for all timeout functions, then the function will be wrap by the call function of the class. I think this means that self is shared between all calls to read.

When there is only one thread, this doesn't cause an issue. When there are multiple threads, it depends on the sequencing of the call to read() (that executes the call/_multiprocessing_timeout) and _handle_timeout(): if a thread calls _handle_timeout after another thread has calls _multiprocessing_timeout, then we will try to close the wrong transport. This actually explains why I was seeing is_alive() == False. This also explains why I wasn't seeing the logs for a given device since I was using self.transport_instance.open_cmd to identify the device (I guess I had duplicated log lines and I didn't see it).

If you want to keep the classes for looks or other reasons, then we only need to make sure to use only local variables (ie. no reference to self) in the call function and other functions in the class - though that defeats the purpose of the class :).

I tested your modified decorate function and it worked, so I guess we managed to find the issue in the end!

Regarding ThreadPoolExecutor vs threading.Timer, I don't think it changes anything regarding resource (thread) consumption. However, in the first case, we call the read in a new thread, and in the second, we call it in the same thread as the calling thread. I think other changes are needed if using threading.Timer because the ScrapliTimeout will be raised in the thread. To optimize resources, we could devise a solution with a single "watcher" thread (for example, with a control loop that runs every X milli-seconds), to which we would submit handle_timeout calls (and cancel them when read returns as in the example I submitted) - but it would be less precise and require more works. The most visible benefit of a single "watcher" thread would we using less RAM.

@carlmontanari
Copy link
Owner

Using @TransportTimeout() will create a single instance of the class for all timeout functions, then the function will be wrap by the call function of the class. I think this means that self is shared between all calls to read.

Yeah it totally all makes sense now! Hard way to learn it, but I guess I will remember it now! I think I got it in my head that every call would be a new instance, but obviously not!

I tested your modified decorate function and it worked, so I guess we managed to find the issue in the end!

Awesome. I'm going to mess around and see how I want to go forward with fixing this. Keep classes but do it right, or ditch them and just go with functions etc... will need to update tests and such so may take a bit to get dialed in.

Regarding ThreadPoolExecutor vs threading.Timer, I don't think it changes anything regarding resource

Makes sense -- I'm not too flustered about it... especially at the moment. Longer term it could be neat to improve the efficiency of things here because I think this is the weakest part of scrapli -- or put in a better way, the least efficient part. But.... thats for another day 😁

Will keep ya posted and defo will make sure that this gets sorted and merged before the next release (2022.06.30).

Thank you a ton for all the help on debugging and talking through this, its been a fun one!

Carl

@carlmontanari
Copy link
Owner

going to close this -- if you have a few and can give the pr over in #237 a try that would be great! thanks again for all the help on this one -- I ended up just smashing everything down into simple function decorators, so it should just work ™️ now!

Carl

@spdkils
Copy link

spdkils commented Sep 20, 2022

I have, I believe the exact same problem with asyncssh, as well as paramiko.

I found the "easy" way to deadlock, is add an autocommand to a vty, and watch it die.

line vty 0
 autocommand who wide

Infinite hangs, stuck in read loop forever.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants