Skip to content

Commit

Permalink
Merge 4eca4ed into 51b147f
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Sep 3, 2019
2 parents 51b147f + 4eca4ed commit ef20a11
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Change History
calcout: add support for calcout record & userCalcOuts
* `#237 <https://github.com/prjemian/spec2nexus/issues/237>`_
epid: add support for epid record
* `#234 <https://github.com/prjemian/spec2nexus/issues/234>`_
utils: replicate the `unix()` command
* `#230 <https://github.com/prjemian/spec2nexus/issues/230>`_
signals: resolve TypeError

Expand Down
60 changes: 58 additions & 2 deletions apstools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
~text_encode
~to_unicode_or_bust
~trim_string_for_EPICS
~unix
~unix_cmd
"""
Expand Down Expand Up @@ -58,6 +59,7 @@
import re
import smtplib
import subprocess
import sys
import threading
import time
import xlrd
Expand Down Expand Up @@ -569,8 +571,62 @@ def trim_string_for_EPICS(msg):
return msg


def unix_cmd(command_list):
"""run a UNIX command, returns (stdout, stderr)"""
def unix(command, raises=True):
"""
run a UNIX command, returns (stdout, stderr)
PARAMETERS
command: str
UNIX command to be executed
raises: bool
If `True`, will raise exceptions as needed,
default: `True`
"""
if sys.platform not in ("linux", "linux2"):
emsg = f"Cannot call unix() when OS={sys.platform}"
raise RuntimeError(emsg)

process = subprocess.Popen(
command,
shell=True,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
)

stdout, stderr = process.communicate()

if len(stderr) > 0:
emsg = f"unix({command}) returned error:\n{stderr}"
logger.error(emsg)
if raises:
raise RuntimeError(emsg)

return stdout, stderr


def unix_cmd(command_list, raises=True):
"""
run a UNIX command, returns (stdout, stderr)
(deprecated): use ``unix()`` instead
PARAMETERS
command_list: [str]
UNIX command, divided into a list of strings
raises: bool
If `True`, will raise `RuntimeWarning()`
since this function is deprecated,
default: `True`
"""
msg = "'unix_cmd()' is deprecated"
msg += ", use 'unix()' instead, slightly different API"
logger.warning(msg)
if raises:
raise RuntimeWarning(msg)

process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
return stdout, stderr
Expand Down
97 changes: 61 additions & 36 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
simple unit tests for this package
"""

import ophyd.sim
import os
import sys
import time
import unittest

_path = os.path.dirname(__file__)
_path = os.path.join(_path, '..')
_path = os.path.join(os.path.dirname(__file__), '..')
if _path not in sys.path:
sys.path.insert(0, _path)

Expand All @@ -22,11 +23,11 @@

class Test_Utils(unittest.TestCase):

def setUp(self):
pass

def tearDown(self):
pass
# def setUp(self):
# pass
#
# def tearDown(self):
# pass

def test_cleanupText(self):
original = "1. Some text to cleanup #25"
Expand All @@ -35,29 +36,38 @@ def test_cleanupText(self):
self.assertEqual(received, expected)

def test_device_read2table(self):
from ophyd.sim import motor1
table = APS_utils.device_read2table(motor1, show_ancient=True, use_datetime=True)
motor1 = ophyd.sim.hw().motor1
table = APS_utils.device_read2table(
motor1, show_ancient=True, use_datetime=True)
# print(table)
expected = """
=============== =====
name value
=============== =====
motor1 0
motor1_setpoint 0
=============== =====
""".strip()
expected = (
"=============== =====\n"
"name value\n"
"=============== =====\n"
"motor1 0 \n"
"motor1_setpoint 0 \n"
"=============== ====="
)
# TODO: figure out how to compare with timestamps
received = "\n".join([v[:21] for v in str(table).strip().splitlines()])
received = "\n".join([
v[:21]
for v in str(table).strip().splitlines()])
self.assertEqual(received, expected) # fails since timestamps do not match

table = APS_utils.device_read2table(motor1, show_ancient=True, use_datetime=False)
table = APS_utils.device_read2table(
motor1, show_ancient=True, use_datetime=False)
# expected = """ """.strip()
received = "\n".join([v[:21] for v in str(table).strip().splitlines()])
received = "\n".join(
[v[:21]
for v in str(table).strip().splitlines()])
self.assertEqual(received, expected) # fails since timestamps do not match

table = APS_utils.device_read2table(motor1, show_ancient=False, use_datetime=False)
table = APS_utils.device_read2table(
motor1, show_ancient=False, use_datetime=False)
# expected = """ """.strip()
received = "\n".join([v[:21] for v in str(table).strip().splitlines()])
received = "\n".join([
v[:21]
for v in str(table).strip().splitlines()])
self.assertEqual(received, expected) # fails since timestamps do not match

def test_dictionary_table(self):
Expand All @@ -76,18 +86,18 @@ def test_dictionary_table(self):
}
table = APS_utils.dictionary_table(md)
received = str(table).strip()
expected = """
=========== =============================================================================
key value
=========== =============================================================================
beamline_id developer
login_id jemian:wow.aps.anl.gov
pid 19072
proposal_id None
scan_id 10
version {'bluesky': '1.5.2', 'ophyd': '1.3.3', 'apstools': '1.1.5', 'epics': '3.3.3'}
=========== =============================================================================
""".strip()
expected = (
"=========== =============================================================================\n"
"key value \n"
"=========== =============================================================================\n"
"beamline_id developer \n"
"login_id jemian:wow.aps.anl.gov \n"
"pid 19072 \n"
"proposal_id None \n"
"scan_id 10 \n"
"version {'bluesky': '1.5.2', 'ophyd': '1.3.3', 'apstools': '1.1.5', 'epics': '3.3.3'}\n"
"=========== ============================================================================="
)
self.assertEqual(received, expected)

def test_itemizer(self):
Expand Down Expand Up @@ -173,17 +183,32 @@ def test_show_ophyd_symbols(self):
self.assertTrue(k in rr, msg)
self.assertEqual(num, len(table.rows))

def test_unix(self):
cmd = 'echo "hello"'
out, err = APS_utils.unix(cmd)
self.assertEqual(out, b'hello\n')
self.assertEqual(err, b"")

cmd = "sleep 0.8 | echo hello"
t0 = time.time()
out, err = APS_utils.unix(cmd)
dt = time.time() - t0
self.assertGreaterEqual(dt, 0.8)
self.assertEqual(out, b'hello\n')
self.assertEqual(err, b"")


class Test_With_Database(unittest.TestCase):

def setUp(self):
from tests.test_export_json import get_db
self.db = get_db()

def tearDown(self):
pass
# def tearDown(self):
# pass

def test_list_recent_scans(self):
# TODO: capture output to ?stderr? and test for it
with self.assertRaises(RuntimeWarning):
APS_utils.list_recent_scans(db=self.db, raises=True)

Expand Down

0 comments on commit ef20a11

Please sign in to comment.