forked from KasperskyLab/RAM
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ref: move ram.capture to ram.console.capture
- Loading branch information
1 parent
5e22adf
commit 86ad8ab
Showing
3 changed files
with
215 additions
and
156 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,163 +1,9 @@ | ||
#!/usr/bin/python | ||
|
||
import os | ||
import sys | ||
import fcntl | ||
|
||
# issues | ||
# | ||
# scenario: service ntpd restart | ||
# sub-daemons inherite pipe descritors -- use fd_cloexec flag for pipe descriptors | ||
# | ||
# scenario: sh -c 'read -p "Press ENTER to continue"' | ||
# no line endings in output -- use integer buffering with buffer size=1. by default line buffering is used | ||
# | ||
# limitations | ||
# | ||
# captured output doesn't catch line end generated by user input | ||
# captured output doesn't block caller. re-output could appeared over snack forms | ||
|
||
class Capture(object): | ||
def __init__(self, buffered=None, handlers=None): | ||
try: | ||
self.r, self.w = os.openpty() | ||
except OSError: | ||
self.r, self.w = os.pipe() | ||
self.od_stdout = sys.__stdout__.fileno() | ||
self.od_stderr = sys.__stderr__.fileno() | ||
|
||
self.buffered = buffered | ||
|
||
if handlers is None: | ||
handlers = [] | ||
self.handlers = handlers | ||
|
||
def __capture(self, orig, swap): | ||
copy = os.dup(orig) | ||
|
||
flag = fcntl.fcntl(copy, fcntl.F_GETFL) | fcntl.FD_CLOEXEC | ||
fcntl.fcntl(copy, fcntl.F_SETFD, flag) | ||
|
||
os.dup2(swap, orig) | ||
|
||
return copy | ||
|
||
def __restore(self, orig, copy): | ||
os.dup2(copy, orig) | ||
os.close(copy) | ||
|
||
def __enter__(self): | ||
self.child_pid = os.fork() | ||
|
||
if self.child_pid: | ||
os.close(self.r) | ||
|
||
self.nd_stdout = self.__capture(self.od_stdout, self.w) | ||
self.nd_stderr = self.__capture(self.od_stderr, self.w) | ||
|
||
self.sys_stdout, sys.stdout = sys.stdout, os.fdopen(self.od_stdout, 'w', 0) | ||
self.sys_stderr, sys.stderr = sys.stderr, os.fdopen(self.od_stderr, 'w', 0) | ||
|
||
os.close(self.w) | ||
else: | ||
os.close(self.w) | ||
|
||
try: | ||
fobj = os.fdopen(self.r) | ||
|
||
if self.buffered: | ||
while fobj: | ||
data = fobj.read(self.buffered) | ||
if not data: | ||
fobj = None | ||
else: | ||
self(data) | ||
else: | ||
for line in iter(fobj.readline, ''): | ||
self(line) | ||
finally: | ||
os._exit(0) | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
if self.child_pid: | ||
sys.stderr, self.sys_stderr = self.sys_stderr, None | ||
sys.stdout, self.sys_stdout = self.sys_stdout, None | ||
|
||
self.__restore(self.od_stderr, self.nd_stderr) | ||
self.__restore(self.od_stdout, self.nd_stdout) | ||
|
||
os.waitpid(self.child_pid, 0) | ||
else: | ||
raise RuntimeError("Shouldn't exit context in child!") | ||
|
||
def __call__(self, chars): | ||
def _stdout(chars): | ||
sys.stdout.write(chars) | ||
sys.stdout.flush() | ||
|
||
for handler in self.handlers + [_stdout]: | ||
if chars is None: | ||
break | ||
else: | ||
chars = handler(chars) | ||
|
||
|
||
class FancyLine(object): | ||
def __init__(self, fancy1=None, fancy2=None): | ||
self.linebreak = True | ||
|
||
if fancy1 is None: | ||
self.fancy1 = '' | ||
else: | ||
self.fancy1 = fancy1 | ||
|
||
if fancy2 is None: | ||
self.fancy2 = self.fancy1 | ||
else: | ||
self.fancy2 = fancy2 | ||
|
||
def __call__(self, chars): | ||
ready = '' | ||
for char in chars: | ||
if self.linebreak: | ||
ready += self.fancy1 | ||
self.linebreak = False | ||
if char == '\n': | ||
ready += self.fancy2 | ||
self.linebreak = True | ||
if char == '\r': | ||
continue | ||
ready += char | ||
return ready | ||
import ram.console | ||
|
||
|
||
class __api__(object): | ||
def __call__(self, *args, **kwargs): | ||
return Capture(*args, **kwargs) | ||
|
||
|
||
if __name__ == '__main__': | ||
import time | ||
import subprocess | ||
|
||
with Capture(handlers=[FancyLine('<<<', '>>>')]): | ||
with Capture(1, handlers=[FancyLine(' *** ')]): | ||
if sys.argv[1:]: | ||
subprocess.call(sys.argv[1:]) | ||
else: | ||
for _ in xrange(4,5): | ||
print _ | ||
time.sleep(1) | ||
|
||
print 'Zello!' | ||
print >> sys.stdout, 'Pello!' | ||
os.system("sh -c 'for i in `seq 1 3`; do echo $i; sleep 1; done'") | ||
|
||
os.system('echo Shell') | ||
os.system('echo Sherr >&2') | ||
os.system('echo Shred >/dev/stderr') | ||
|
||
subprocess.call('echo Subps', shell=True) | ||
subprocess.call(['echo', 'Subss']) | ||
|
||
print >> sys.stderr, 'Error!' | ||
return ram.console.capture(*args, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#!/usr/bin/python | ||
|
||
|
||
try: | ||
import unittest | ||
unittest.TextTestResult | ||
except AttributeError: | ||
import unittest2 as unittest | ||
unittest.TextTestResult | ||
|
||
|
||
import os | ||
import sys | ||
import subprocess | ||
|
||
import ram.console | ||
|
||
|
||
class CaptureTestCase(unittest.TestCase): | ||
|
||
def setUp(self): | ||
self.bufferize = ram.console.Bufferize() | ||
self.arrowline = ram.console.FancyLine('<<<', '>>>') | ||
self.starsline = ram.console.FancyLine(' *** ') | ||
|
||
def test_capture_buffer(self): | ||
with ram.console.capture(0, handlers=[self.bufferize]): | ||
with ram.console.capture(0, handlers=[self.arrowline]): | ||
with ram.console.capture(1, handlers=[self.starsline]): | ||
print 'Start' | ||
|
||
print >> sys.stdout, 'Aimed' | ||
os.system("sh -c 'echo Subsh'") | ||
|
||
os.system('echo Shell') | ||
os.system('echo Sherr >&2') | ||
os.system('echo Shred >/dev/stderr') | ||
|
||
subprocess.call('echo Subps', shell=True) | ||
subprocess.call('echo Subed >&2', shell=True) | ||
subprocess.call(['echo', 'Subss']) | ||
|
||
print >> sys.stderr, 'Error' | ||
|
||
lines = list(self.bufferize) | ||
assert len(lines) == 10 | ||
|
||
assert lines[0] == "<<< *** Start *** >>>" | ||
assert lines[1] == "<<< *** Aimed *** >>>" | ||
assert lines[2] == "<<< *** Subsh *** >>>" | ||
assert lines[3] == "<<< *** Shell *** >>>" | ||
assert lines[4] == "<<< *** Sherr *** >>>" | ||
assert lines[5] == "<<< *** Shred *** >>>" | ||
assert lines[6] == "<<< *** Subps *** >>>" | ||
assert lines[7] == "<<< *** Subed *** >>>" | ||
assert lines[8] == "<<< *** Subss *** >>>" | ||
assert lines[9] == "<<< *** Error *** >>>" | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |