Skip to content

Commit

Permalink
Merge branch 'yan-981-improve-dlg-logging'
Browse files Browse the repository at this point in the history
Signed-off-by: Rodrigo Tobar <rtobar@icrar.org>
  • Loading branch information
rtobar committed Jun 15, 2022
2 parents a86dcac + 675d148 commit 751531e
Show file tree
Hide file tree
Showing 23 changed files with 92 additions and 82 deletions.
3 changes: 3 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[Main]
disable=all
enable=logging-not-lazy,logging-format-interpolation
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ jobs:
script:
- READTHEDOCS=True make -C docs html SPHINXOPTS="-W --keep-going"

# Run pylint to enforce our (few) rules
- name: pylint
python: "3.9"
before_install:
install:
- pip install pylint
script:
- pylint daliuge-common daliuge-translator daliuge-engine


# We want to use docker during the tests
services:
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def execute(self, data):
drop_state = DROPStates.COMPLETED
execStatus = AppDROPStates.FINISHED
except:
logger.exception("Error while executing %r" % (self,))
logger.exception("Error while executing %r", self)
drop_state = DROPStates.ERROR
execStatus = AppDROPStates.ERROR
finally:
Expand Down
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/apps/dynlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def load_and_init(libname, oid, uid, params):
libname = find_library(libname) or libname

lib = ctypes.cdll.LoadLibrary(libname)
logger.info("Loaded {} as {!r}".format(libname, lib))
logger.info("Loaded %s as %r", libname, lib)

one_of_functions = [["run", "run2"], ["init", "init2"]]
for functions in one_of_functions:
Expand Down Expand Up @@ -251,7 +251,7 @@ def load_and_init(libname, oid, uid, params):

if hasattr(lib, "init2"):
# With init2 we pass the params as a PyObject*
logger.info("Extra parameters passed to application: {}".format(params))
logger.info("Extra parameters passed to application: %r", params)
init2 = lib.init2
init2.restype = ctypes.py_object
result = init2(ctypes.pointer(c_app), ctypes.py_object(params))
Expand Down Expand Up @@ -527,7 +527,7 @@ def run(self):
logger.info("Subprocess %s", step)
error = get_from_subprocess(self.proc, queue)
if error is not None:
logger.error("Error in sub-process when " + step)
logger.error("Error in sub-process when %s", step)
raise error
finally:
self.proc.join(self.timeout)
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ def _init_func_defaults(self):
# we came all this way, now assume that any resulting dict is correct
if not isinstance(self.func_defaults, dict):
logger.error(
f"Wrong format or type for function defaults for "
+ "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}"
"Wrong format or type for function defaults for %s: %r, %r",
self.f.__name__, self.func_defaults, type(self.func_defaults)
)
raise ValueError
if self.input_parser is DropParser.PICKLE:
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/dask_emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def __init__(self, f, nout):
self.fcode, self.fdefaults = pyfunc.serialize_func(f)
self.original_kwarg_names = []
self.nout = nout
logger.debug("Created %r" % self)
logger.debug("Created %r", self)

def make_dropdict(self):

Expand Down
12 changes: 6 additions & 6 deletions daliuge-engine/dlg/deploy/dlg_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def connect_to_host(self, server, port):
try:
the_socket = socket.create_connection((server, port))
the_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
logger.info("Connected to %s on port %d" % (server, port))
logger.info("Connected to %s on port %d", server, port)
return the_socket
except Exception:
logger.exception("Failed to connect to %s:%d", server, port)
Expand Down Expand Up @@ -173,7 +173,7 @@ def loop(self):
continue
else:
tag = data[0:at]
logger.debug("Received {0} from Monitor".format(tag))
logger.debug("Received %s from Monitor", b2s(tag))
dlg_sock = self._dlg_sock_dict.get(tag, None)
to_send = data[at + dl :]
if dlg_sock is None:
Expand All @@ -191,21 +191,21 @@ def loop(self):
if send_to_dlg:
try:
dlg_sock.sendall(to_send)
logger.debug("Sent {0} to DALiuGE manager".format(tag))
logger.debug("Sent %s to DALiuGE manager", b2s(tag))
except socket.error:
self.close_dlg_socket(dlg_sock, tag)
else:
# from one of the DALiuGE sockets
data = the_socket.recv(BUFF_SIZE)
tag = self._dlg_sock_tag_dict.get(the_socket, None)
logger.debug("Received {0} from DALiuGE manager".format(b2s(tag)))
logger.debug("Received %s from DALiuGE manager", b2s(tag))
if tag is None:
logger.error(
"Tag for DALiuGE socket {0} is gone".format(the_socket)
"Tag for DALiuGE socket %r is gone", the_socket
)
else:
send_to_monitor(self.monitor_socket, delimit.join([tag, data]))
logger.debug("Sent {0} to Monitor".format(b2s(tag)))
logger.debug("Sent %s to Monitor", b2s(tag))
if len(data) == 0:
self.close_dlg_socket(the_socket, tag)

Expand Down
28 changes: 14 additions & 14 deletions daliuge-engine/dlg/deploy/utils/monitor_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(self, graph_path, status_path):
self.gnid_ip_dict = dict()
self.status_path = status_path
with open(graph_path) as f:
logger.info("Loading graph from file {0}".format(graph_path))
logger.info("Loading graph from file %s", graph_path)
self.pg_spec = json.load(f)["g"]
for i, dropspec in enumerate(self.pg_spec.values()):
gnid = str(i)
Expand Down Expand Up @@ -142,7 +142,7 @@ def parse_status(self, gexf_file, out_dir=None, remove_gexf=False):
out_dir = os.path.dirname(gexf_file)
with open(gexf_file) as gf:
gexf_list = gf.readlines()
logger.info("Gexf file '{0}' loaded".format(gexf_file))
logger.info("Gexf file '%s' loaded", gexf_file)
with open(self.status_path) as f:
for i, line in enumerate(f):
colour_dict = dict()
Expand Down Expand Up @@ -183,14 +183,14 @@ def parse_status(self, gexf_file, out_dir=None, remove_gexf=False):
# fo.write('{0}{1}'.format(new_line, os.linesep))
# fo.write('{0}{1}'.format(new_line, '\n'))
fo.write(new_line)
logger.info("GEXF file '{0}' is generated".format(new_gexf))
logger.info("GEXF file '%s' is generated", new_gexf)
new_png = new_gexf.split(".gexf")[0] + ".png"
cmd = "{0} {1} {2}".format(java_cmd, new_gexf, new_png)
ret = commands.getstatusoutput(cmd)
if ret[0] != 0:
logger.error(
"Fail to print png from %s to %s: %s"
% (new_gexf, new_png, ret[1])
"Fail to print png from %s to %s: %s",
new_gexf, new_png, ret[1]
)
del colour_dict
if remove_gexf:
Expand Down Expand Up @@ -259,7 +259,7 @@ def get_state_changes(self, gexf_file, grep_log_file, steps=400, out_dir=None):
state = line.split()[-1]
fo.write("{0},{1},{2},{3}".format(ts, oid, state, os.linesep))
else:
logger.info("csv file already exists: {0}".format(csv_file))
logger.info("csv file already exists: %s", csv_file)

if not os.path.exists(sqlite_file):
sql = sql_create_status.format(csv_file)
Expand All @@ -269,10 +269,10 @@ def get_state_changes(self, gexf_file, grep_log_file, steps=400, out_dir=None):
cmd = "sqlite3 {0} < {1}".format(sqlite_file, sql_file)
ret = commands.getstatusoutput(cmd)
if ret[0] != 0:
logger.error("fail to create sqlite: {0}".format(ret[1]))
logger.error("fail to create sqlite: %s", ret[1])
return
else:
logger.info("sqlite file already exists: {0}".format(sqlite_file))
logger.info("sqlite file already exists: %s", sqlite_file)

dbconn = dbdrv.connect(sqlite_file)
q = "SELECT min(ts) from ac"
Expand All @@ -299,10 +299,10 @@ def get_state_changes(self, gexf_file, grep_log_file, steps=400, out_dir=None):
a = el
b = lr[i + 1]
step_name = "{0}-{1}".format(a, b)
logger.debug("stepname: %s" % step_name)
logger.debug("stepname: %s", step_name)
new_gexf = "{0}/{1}.gexf".format(out_dir, step_name)
if os.path.exists(new_gexf):
logger.info("{0} already exists, ignore".format(new_gexf))
logger.info("%s already exists, ignore", new_gexf)
last_gexf = new_gexf
continue
sql = sql_query.format(a, b)
Expand Down Expand Up @@ -332,18 +332,18 @@ def get_state_changes(self, gexf_file, grep_log_file, steps=400, out_dir=None):
colour.attrib["g"] = "{0}".format(g)
colour.attrib["b"] = "{0}".format(b)
tree.write(new_gexf)
logger.info("GEXF file '{0}' is generated".format(new_gexf))
logger.info("GEXF file '%s' is generated", new_gexf)
del drop_dict
if not filecmp.cmp(last_gexf, new_gexf, False):
new_png = new_gexf.split(".gexf")[0] + ".png"
cmd = "{0} {1} {2}".format(java_cmd, new_gexf, new_png)
ret = commands.getstatusoutput(cmd)
else:
logger.info("Identical {0} == {1}".format(new_gexf, last_gexf))
logger.info("Identical %s == %s", new_gexf, last_gexf)
last_gexf = new_gexf
if ret[0] != 0:
logger.error(
"Fail to print png from %s to %s: %s" % (last_gexf, new_png, ret[1])
"Fail to print png from %s to %s: %s", last_gexf, new_png, ret[1]
)

def build_drop_subgraphs(self, node_range="[0:20]"):
Expand Down Expand Up @@ -519,7 +519,7 @@ def build_drop_fullgraphs(self, do_subgraph=False, graph_lib="pygraphviz"):
logging.basicConfig(filename=options.log_file, level=logging.DEBUG, format=FORMAT)

if options.edgelist and options.dot_file is not None:
logger.info("Loading networx graph from file {0}".format(options.graph_path))
logger.info("Loading networx graph from file %s", options.graph_path)
gp = GraphPlayer(options.graph_path, options.status_path)
g = gp.build_drop_fullgraphs(graph_lib="networkx")
nx.write_edgelist(g, options.dot_file)
Expand Down
31 changes: 15 additions & 16 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,7 @@ def _getArg(self, kwargs, key, default):
val = default
if key in kwargs:
val = kwargs.pop(key)
elif logger.isEnabledFor(logging.DEBUG):
logger.debug("Defaulting %s to %s in %r" % (key, str(val), self))
logger.debug("Defaulting %s to %s in %r", key, str(val), self)
return val

def __hash__(self):
Expand Down Expand Up @@ -695,7 +694,7 @@ def commit(self):
# Set as committed
self._committed = True
else:
logger.debug("Trying to re-commit DROP %s, cannot overwrite." % self)
logger.debug("Trying to re-commit DROP %s, cannot overwrite.", self)

@property
def oid(self):
Expand Down Expand Up @@ -852,7 +851,7 @@ def parent(self):
def parent(self, parent):
if self._parent and parent:
logger.warning(
"A parent is already set in %r, overwriting with new value" % (self,)
"A parent is already set in %r, overwriting with new value", self
)
if parent:
prevParent = self._parent
Expand Down Expand Up @@ -1048,8 +1047,8 @@ def addStreamingConsumer(self, streamingConsumer, back=True):
if scuid in self._streamingConsumers_uids:
return
logger.debug(
"Adding new streaming streaming consumer for %r: %s"
% (self, streamingConsumer)
"Adding new streaming streaming consumer for %r: %s",
self, streamingConsumer
)
self._streamingConsumers.append(streamingConsumer)

Expand Down Expand Up @@ -1242,7 +1241,7 @@ def open(self, **kwargs):
)

io = self.getIO()
logger.debug("Opening drop %s" % (self.oid))
logger.debug("Opening drop %s", self.oid)
io.open(OpenMode.OPEN_READ, **kwargs)

# Save the IO object in the dictionary and return its descriptor instead
Expand Down Expand Up @@ -1344,8 +1343,8 @@ def write(self, data: Union[bytes, memoryview], **kwargs):
if nbytes != dataLen:
# TODO: Maybe this should be an actual error?
logger.warning(
"Not all data was correctly written by %s (%d/%d bytes written)"
% (self, nbytes, dataLen)
"Not all data was correctly written by %s (%d/%d bytes written)",
self, nbytes, dataLen
)

# see __init__ for the initialization to None
Expand All @@ -1371,12 +1370,12 @@ def write(self, data: Union[bytes, memoryview], **kwargs):
else:
if remaining < 0:
logger.warning(
"Received and wrote more bytes than expected: "
+ str(-remaining)
"Received and wrote more bytes than expected: %d",
-remaining
)
logger.debug(
"Automatically moving %r to COMPLETED, all expected data arrived"
% (self,)
"Automatically moving %r to COMPLETED, all expected data arrived",
self
)
self.setCompleted()
else:
Expand Down Expand Up @@ -1656,7 +1655,7 @@ def setCompleted(self):
pass
except:
self.status = DROPStates.ERROR
logger.error("Path not accessible: %s" % self.path)
logger.error("Path not accessible: %s", self.path)
self._size = 0
# Signal our subscribers that the show is over
self._fire("dropCompleted", status=DROPStates.COMPLETED)
Expand Down Expand Up @@ -1775,7 +1774,7 @@ def setCompleted(self):
try:
stat = self.getIO().fileStatus()
logger.debug(
"Setting size of NGASDrop %s to %s" % (self.fileId, stat["FileSize"])
"Setting size of NGASDrop %s to %s", self.fileId, stat["FileSize"]
)
self._size = int(stat["FileSize"])
except:
Expand Down Expand Up @@ -2658,7 +2657,7 @@ def execute(self, _send_notifications=True):
return
tries += 1
logger.exception(
"Error while executing %r (try %d/%d)" % (self, tries, self.n_tries)
"Error while executing %r (try %d/%d)", self, tries, self.n_tries
)

# We gave up running the application, go to error
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def _close(self, **kwargs):
# If length wasn't known up-front we first send Content-Length and then the buffer here.
conn.putheader("Content-Length", len(self._buf))
conn.endheaders()
logger.debug("Sending data for file %s to NGAS" % (self._fileId))
logger.debug("Sending data for file %s to NGAS", self._fileId)
conn.send(self._buf)
self._buf = None
else:
Expand All @@ -618,7 +618,7 @@ def _write(self, data, **kwargs) -> int:
self._buf += data
else:
self._desc.send(data)
logger.debug("Wrote %s bytes" % len(data))
logger.debug("Wrote %s bytes", len(data))
return len(data)

def exists(self) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/lifecycle/hsm/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def addStore(self, newStore):
"""
@param newStore store.AbstractStore
"""
logger.debug("Adding store to HSM: " + str(newStore))
logger.debug("Adding store to HSM: %s", str(newStore))
self._stores.append(newStore)

def getSlowestStore(self):
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/lifecycle/hsm/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def updateSpaces(self):
total = self.getTotalSpace()
perc = avail * 100.0 / total
logger.debug(
"Available/Total space on %s: %d/%d (%.2f %%)"
% (self, avail, total, perc)
"Available/Total space on %s: %d/%d (%.2f %%)",
self, avail, total, perc
)
pass

Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/lifecycle/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def __init__(self, dbModuleName, *connArgs):
self._connArgs = connArgs
except:
logger.error(
"Cannot import module %s, RDBMSRegistry cannot start" % (dbModuleName)
"Cannot import module %s, RDBMSRegistry cannot start", dbModuleName
)
raise

Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def launchServer(opts):
dmName = opts.dmType.__name__

logger.info("DALiuGE version %s running at %s", version.full_version, os.getcwd())
logger.info("Creating %s" % (dmName))
logger.info("Creating %s", dmName)
try:
dm = opts.dmType(*opts.dmArgs, **opts.dmKwargs)
except:
Expand All @@ -86,7 +86,7 @@ def handle_signal(signNo, stack_frame):
if _terminating:
return
_terminating = True
logger.info("Exiting from %s" % (dmName))
logger.info("Exiting from %s", dmName)

server.stop_manager()

Expand Down

0 comments on commit 751531e

Please sign in to comment.