Skip to content

Commit

Permalink
Added logging information for Copy operations
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Feb 1, 2023
1 parent 1bce7c7 commit 2ae3f65
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
8 changes: 8 additions & 0 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,15 @@ class CopyApp(BarrierAppDROP):

def run(self):
logger.debug("Using buffer size %d", self.bufsize)
logger.info(
"Copying data from inputs %s to outputs %s",
[x.name for x in self.inputs],
[x.name for x in self.outputs],
)
self.copyAll()
logger.info(
"Copy finished",
)

def copyAll(self):
for inputDrop in self.inputs:
Expand Down
35 changes: 25 additions & 10 deletions daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_sleepapp(self):
def _test_copyapp_simple(self, app):

# Again, not foo fancy, simple apps require simple tests
a, c = (InMemoryDROP(x, x) for x in ("a", "c"))
a, c = (InMemoryDROP(x, x, nm=x) for x in ("a", "c"))
b = app("b", "b")
b.addInput(a)
b.addOutput(c)
Expand All @@ -87,7 +87,7 @@ def _test_copyapp_simple(self, app):
def _test_copyapp_order_preserved(self, app):

# Inputs are copied in the order they are added
a, b, d = (InMemoryDROP(x, x) for x in ("a", "b", "d"))
a, b, d = (InMemoryDROP(x, x, nm=x) for x in ("a", "b", "d"))
c = app("c", "c")
for x in a, b:
c.addInput(x)
Expand Down Expand Up @@ -147,7 +147,9 @@ def test_helloworldapp(self):
h.addOutput(b)
b.addProducer(h)
h.execute()
self.assertEqual(h.greeting.encode("utf8"), droputils.allDropContents(b))
self.assertEqual(
h.greeting.encode("utf8"), droputils.allDropContents(b)
)

def test_parallelHelloWorld(self):
m0 = InMemoryDROP("m0", "m0")
Expand All @@ -172,7 +174,8 @@ def test_parallelHelloWorld(self):
self._test_graph_runs(ad, m0, f)
for i in range(len(f)):
self.assertEqual(
("Hello %s" % greets[i]).encode("utf8"), droputils.allDropContents(f[i])
("Hello %s" % greets[i]).encode("utf8"),
droputils.allDropContents(f[i]),
)

def test_ngasio(self):
Expand Down Expand Up @@ -236,7 +239,9 @@ def test_genericNpyScatter_multi(self):
c = InMemoryDROP("c", "c")
droputils.save_numpy(b, data1_in)
droputils.save_numpy(c, data2_in)
s = GenericNpyScatterApp("s", "s", num_of_copies=2, scatter_axes="[0,0]")
s = GenericNpyScatterApp(
"s", "s", num_of_copies=2, scatter_axes="[0,0]"
)
s.addInput(b)
s.addInput(c)
o1 = InMemoryDROP("o1", "o1")
Expand Down Expand Up @@ -272,7 +277,8 @@ def test_listappendthrashing(self, size=1000):
self.assertEqual(b.marray, data_out)

@unittest.skipIf(
sys.version_info < (3, 8), "Multiprocessing not compatible with Python < 3.8"
sys.version_info < (3, 8),
"Multiprocessing not compatible with Python < 3.8",
)
def test_multi_listappendthrashing(self, size=1000, parallel=True):
max_threads = cpu_count(logical=False)
Expand All @@ -285,13 +291,18 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True):
X = AverageArraysApp("X", "X")
Z = InMemoryDROP("Z", "Z")
drops = [ListAppendThrashingApp(x, x, size=size) for x in drop_ids]
mdrops = [InMemoryDROP(chr(65 + x), chr(65 + x)) for x in range(max_threads)]
mdrops = [
InMemoryDROP(chr(65 + x), chr(65 + x)) for x in range(max_threads)
]
if parallel:
# a bit of magic to get the app drops using the processes
_ = [drop.__setattr__("_tp", threadpool) for drop in drops]
_ = [drop.__setattr__("_tp", threadpool) for drop in mdrops]
_ = [drop.__setattr__("_sessID", session_id) for drop in mdrops]
_ = [memory_manager.register_drop(drop.uid, session_id) for drop in mdrops]
_ = [
memory_manager.register_drop(drop.uid, session_id)
for drop in mdrops
]
X.__setattr__("_tp", threadpool)
Z.__setattr__("_tp", threadpool)
Z.__setattr__("_sessID", session_id)
Expand All @@ -301,7 +312,9 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True):
_ = [d.addOutput(m) for d, m in zip(drops, mdrops)]
_ = [X.addInput(m) for m in mdrops]
X.addOutput(Z)
logger.info(f"Number of inputs/outputs: {len(X.inputs)}, {len(X.outputs)}")
logger.info(
f"Number of inputs/outputs: {len(X.inputs)}, {len(X.outputs)}"
)
self._test_graph_runs([S, X, Z] + drops + mdrops, S, Z, timeout=200)
# Need to run our 'copy' of the averaging APP
num_array = []
Expand Down Expand Up @@ -337,7 +350,9 @@ def test_speedup(self):
st = time.time()
self.test_multi_listappendthrashing(size=size, parallel=True)
t2 = time.time() - st
logger.info(f"Speedup: {t1 / t2:.2f} from {cpu_count(logical=False)} cores")
logger.info(
f"Speedup: {t1 / t2:.2f} from {cpu_count(logical=False)} cores"
)
# TODO: This is unpredictable, but maybe we can do something meaningful.
# self.assertAlmostEqual(t1/cpu_count(logical=False), t2, 1)
# How about this? We only need to see some type of speedup
Expand Down

0 comments on commit 2ae3f65

Please sign in to comment.