Skip to content

Commit

Permalink
EngineClient: I/O streams
Browse files Browse the repository at this point in the history
Major internal change to support versatile I/O configuration per engine
client (not only stdout/stderr readers and stdin writer). Created
EngineClientStream class representing the stream being managed by an
Engine.

Change-Id: I36cab5b1df8460732f830d99c781cdce3f99637a
  • Loading branch information
Stephane Thiell committed Jul 20, 2014
1 parent 6ea06cc commit dc429bb
Show file tree
Hide file tree
Showing 13 changed files with 558 additions and 665 deletions.
6 changes: 6 additions & 0 deletions ChangeLog
@@ -1,3 +1,9 @@
2014-05-20 S. Thiell <stephane.thiell@cea.fr>

* EngineClient.py: Code improvement to support multiple customizable I/O
streams per EngineClient in different mode, each being named and having
their own read/write buffers and attributes.

2014-04-30 A. Degremont <aurelien.degremont@cea.fr>

* Clush.py: Add a 'worker' option to switch default worker (ticket #221).
Expand Down
82 changes: 33 additions & 49 deletions lib/ClusterShell/Engine/EPoll.py
Expand Up @@ -41,7 +41,7 @@
import select
import time

from ClusterShell.Engine.Engine import Engine
from ClusterShell.Engine.Engine import Engine, E_READ, E_WRITE
from ClusterShell.Engine.Engine import EngineNotSupportedError
from ClusterShell.Engine.Engine import EngineTimeoutException
from ClusterShell.Worker.EngineClient import EngineClientEOF
Expand Down Expand Up @@ -72,13 +72,11 @@ def release(self):
self.epolling.close()

def _register_specific(self, fd, event):
"""
Engine-specific fd registering. Called by Engine register.
"""
eventmask = 0
if event & (Engine.E_READ | Engine.E_ERROR):
"""Engine-specific fd registering. Called by Engine register."""
if event & E_READ:
eventmask = select.EPOLLIN
elif event == Engine.E_WRITE:
else:
assert event & E_WRITE
eventmask = select.EPOLLOUT

self.epolling.register(fd, eventmask)
Expand All @@ -101,13 +99,7 @@ def _modify_specific(self, fd, event, setvalue):
self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event,
setvalue))
if setvalue:
eventmask = 0
if event & (Engine.E_READ | Engine.E_ERROR):
eventmask = select.EPOLLIN
elif event == Engine.E_WRITE:
eventmask = select.EPOLLOUT

self.epolling.register(fd, eventmask)
self._register_specific(fd, event)
else:
self.epolling.unregister(fd)

Expand Down Expand Up @@ -145,70 +137,62 @@ def runloop(self, timeout):
for fd, event in evlist:

# get client instance
client, fdev = self._fd2client(fd)
client, stream = self._fd2client(fd)
if client is None:
continue

fdev = stream.evmask
fname = stream.name

# set as current processed client
self._current_client = client

# check for poll error condition of some sort
if event & select.EPOLLERR:
self._debug("EPOLLERR %s" % client)
client._close_writer()
self._debug("EPOLLERR fd=%d fname=%s fdev=0x%x (%s)" % \
(fd, fname, fdev, client))
assert fdev & E_WRITE
self.remove_stream(client, stream)
self._current_client = None
continue

# check for data to read
if event & select.EPOLLIN:
#self._debug("EPOLLIN fd=%d %s" % (fd, client))
assert fdev & (Engine.E_READ | Engine.E_ERROR)
assert client._events & fdev
self.modify(client, 0, fdev)
assert fdev & E_READ
assert stream.events & fdev, (stream.events, fdev)
self.modify(client, fname, 0, fdev)
try:
if fdev & Engine.E_READ:
client._handle_read()
else:
client._handle_error()
client._handle_read(fname)
except EngineClientEOF:
self._debug("EngineClientEOF %s" % client)
if fdev & Engine.E_READ:
self.remove(client)
self._debug("EngineClientEOF %s %s" % (client, fname))
self.remove_stream(client, stream)
self._current_client = None
continue

# or check for end of stream (do not handle both at the same
# time because handle_read() may perform a partial read)
elif event & select.EPOLLHUP:
self._debug("EPOLLHUP fd=%d %s (r%s,e%s,w%s)" % (fd,
client.__class__.__name__, client.fd_reader,
client.fd_error, client.fd_writer))
if fdev & Engine.E_READ:
if client._events & Engine.E_ERROR:
self.modify(client, 0, fdev)
else:
self.remove(client)
else:
if client._events & Engine.E_READ:
self.modify(client, 0, fdev)
else:
self.remove(client)
assert fdev & E_READ, "fdev 0x%x & E_READ" % fdev
self._debug("EPOLLHUP fd=%d fname=%s %s (%s)" % \
(fd, fname, client, client.streams))
self.remove_stream(client, stream)
self._current_client = None
continue

# check for writing
if event & select.EPOLLOUT:
self._debug("EPOLLOUT fd=%d %s (r%s,e%s,w%s)" % (fd,
client.__class__.__name__, client.fd_reader,
client.fd_error, client.fd_writer))
assert fdev == Engine.E_WRITE
assert client._events & fdev
self.modify(client, 0, fdev)
client._handle_write()
self._debug("EPOLLOUT fd=%d fname=%s %s (%s)" % \
(fd, fname, client, client.streams))
assert fdev & E_WRITE
assert stream.events & fdev, (stream.events, fdev)
self.modify(client, fname, 0, fdev)
client._handle_write(fname)

self._current_client = None

# apply any changes occured during processing
if client.registered:
self.set_events(client, client._new_events)
self.set_events(client, stream)

# check for task runloop timeout
if timeout > 0 and time.time() >= start_time + timeout:
Expand Down

0 comments on commit dc429bb

Please sign in to comment.