Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Liu 128 #79

Merged
merged 63 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
a92d837
Adds ListAppendThrashingApp - a toy BarrierAppDROP that wastes lots o…
pritchardn Aug 26, 2021
6384139
Drops spawn a process when executed within a thread-pool.
pritchardn Aug 26, 2021
8ef60e0
Adds a test for ListAppendThrashingApp
pritchardn Aug 26, 2021
23a2e88
Added parallel test hooks
awicenec Sep 13, 2021
06e3bd0
Reverted default back to no threads, else dask_emulation fails
awicenec Sep 13, 2021
098eb4c
fixed hugegraph test
awicenec Sep 13, 2021
2fcc18b
multiprocessing graphs only work with FileDROPs right now
awicenec Sep 13, 2021
c62f950
Merge branch 'liu-174' into LIU-128
awicenec Sep 13, 2021
ee8ce54
Fixed crc32 deprecation warnings
awicenec Sep 13, 2021
ae24061
Fixed doc tree
awicenec Sep 13, 2021
3c9a435
Adds a basic, technically functioning but non-deterministic sharedMem…
pritchardn Sep 17, 2021
e3e0a65
Adds a deterministic, technically functional but still incorrect memo…
pritchardn Sep 24, 2021
a32edb8
Re-implements shared memory to write directly to posix_shm. This is f…
pritchardn Sep 24, 2021
fcea8d7
Makes generateArray an N^2 operation to show speedup more clearly.
pritchardn Sep 28, 2021
50d324e
When opening an existing shmem location, finds and sets size correctly.
pritchardn Sep 28, 2021
b143b0c
Minor readibility fix for test_multi_listappendthrashing.
pritchardn Sep 28, 2021
a7232b0
Condenses _mm attribute for sharedmemory into the single _tp attribute.
pritchardn Sep 28, 2021
8fc06c3
Adds session_id information to shared memory drops along with updated…
pritchardn Sep 28, 2021
b6466e1
Adds (untested) addition to NodeManager for shared memory drop initia…
pritchardn Sep 28, 2021
dc23202
separates shared memory manager and shared memory implementation into…
pritchardn Sep 29, 2021
02b1fc4
Implementation cleanup for shared_memory_manager.py
pritchardn Sep 29, 2021
0d87648
Implementation cleanup for shared_memory.py
pritchardn Sep 29, 2021
991c338
Shared memory handles shrinking a block.
pritchardn Sep 29, 2021
0a6c676
shared_memory.py now supports:
pritchardn Sep 29, 2021
5960def
shared_memory.py now supports:
pritchardn Sep 29, 2021
2ccdb94
Shared Memory manager now shuts down when deleted. (Consider changing…
pritchardn Sep 29, 2021
e9ea24d
Adds tests for the DlgSharedMemoryManager
pritchardn Sep 29, 2021
fbda5bc
Adds a DlgProcess class to setup error listeners via pipes
pritchardn Oct 4, 2021
6905a79
DropProxys now make their own client to handle multiprocessing.
pritchardn Oct 20, 2021
61b6a87
SharedMemory now truncates shared memory blocks aggressively on write…
pritchardn Oct 20, 2021
a850b16
Updates copyDropContents to only report on read data if data was actu…
pritchardn Oct 20, 2021
28f65c5
SumupContainerChecksum only performs the sum if the checksum is not N…
pritchardn Oct 20, 2021
3408100
Adds parallel dm tests
pritchardn Oct 20, 2021
985de06
Removes unnecessary and error-inducing import
pritchardn Oct 20, 2021
ce6c249
RPC clients now shutdown their own context on deletion.
pritchardn Oct 21, 2021
e33aeec
An attempt to investigate travis test failure (perhaps it is somethin…
pritchardn Oct 27, 2021
fb6d062
An attempt to investigate travis test failure (perhaps it is somethin…
pritchardn Oct 27, 2021
9538de3
Merge remote-tracking branch 'origin/LIU-128' into LIU-128
pritchardn Oct 27, 2021
00ead7f
Undoes an attempt to investigate travis test failure (perhaps it is s…
pritchardn Oct 27, 2021
095ad4a
Only threads with > 1 processor available
pritchardn Oct 27, 2021
033a1b1
test_dm parallel tests attempt to test with maximum cores.
pritchardn Oct 27, 2021
b1630fa
Undoes an admittedly gross way to bypass null checksum checking
pritchardn Oct 27, 2021
1c5d0e2
Storage drops who are queried for their checksum but do not yet have …
pritchardn Oct 27, 2021
bdeb99b
Fixes test_averagearraysapp whose behaviour does not work when treati…
pritchardn Oct 27, 2021
1d493e3
dropWrote from Outside now assumes that an externally written drop ca…
pritchardn Oct 27, 2021
671f6b4
Fixes dynamic checksum generation to deal with bytes arrays too.
pritchardn Oct 27, 2021
1e9f5a1
Fixes test_averagearraysapp to compare only a single average.
pritchardn Oct 27, 2021
6b26963
Changes shared-memory manager behaviour to only close blocks (other m…
pritchardn Oct 27, 2021
e305ba5
Catches the most esquisite edge-case where shared files are simultane…
pritchardn Oct 28, 2021
73f28dc
Adds a first-cut of documentation for the shared memory feature.
pritchardn Oct 28, 2021
058e4b3
Re-orders tests to establish if tests-fail due to networking issues o…
pritchardn Oct 28, 2021
8db4ddd
Adds import guarding for Python < 3.8 - _posixshm did not exist until…
pritchardn Oct 29, 2021
4598d33
Merge branch 'master' into LIU-128
pritchardn Nov 2, 2021
0ba3d87
Removes unused .rst file.
pritchardn Nov 3, 2021
1a2a260
Makes session ids random in test_dm.py
pritchardn Nov 3, 2021
cfae921
Tests serial dm first, then parallel.
pritchardn Nov 3, 2021
90678bd
Removes pesky tests (investigating if error cleanup is causing issue).
pritchardn Nov 3, 2021
705a19e
Skips parallel error tests (to investigate if these are the true cause).
pritchardn Nov 3, 2021
3f4ac1f
Conditionally skips multiprocessing tests.
pritchardn Nov 3, 2021
1b22c4c
Merge remote-tracking branch 'origin/master' into LIU-128
pritchardn Nov 4, 2021
310b431
Continued merging process.
pritchardn Nov 4, 2021
7ede09b
Merge branch 'master' into LIU-128
pritchardn Dec 9, 2021
c8d7f15
Changes crc32c to crc32
pritchardn Dec 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ..meta import dlg_component, dlg_batch_input, dlg_batch_output, dlg_streaming_input

try:
from crc32c import crc32c # @UnusedImport
from crc32c import crc32c as crc32 # @UnusedImport
except:
from binascii import crc32 # @Reimport

Expand Down Expand Up @@ -63,7 +63,7 @@ def run(self):
buf = inputDrop.read(desc, bufsize)
crc = 0
while buf:
crc = crc32c(buf, crc)
crc = crc32(buf, crc)
buf = inputDrop.read(desc, bufsize)
inputDrop.close(desc)

Expand Down Expand Up @@ -102,7 +102,7 @@ def initialize(self, **kwargs):

def dataWritten(self, uid, data):
self.execStatus = AppDROPStates.RUNNING
self._crc = crc32c(data, self._crc)
self._crc = crc32(data, self._crc)

def dropCompleted(self, uid, status):
outputDrop = self.outputs[0]
Expand Down
83 changes: 78 additions & 5 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
"""Applications used as examples, for testing, or in simple situations"""
import pickle
import random
import urllib.error
import urllib.request

Expand Down Expand Up @@ -256,7 +257,10 @@ class AverageArraysApp(BarrierAppDROP):
# default values
methods = ["mean", "median"]
method = dlg_string_param("method", methods[0])
marray = []

def __init__(self, oid, uid, **kwargs):
super().__init__(oid, kwargs)
self.marray = []

def initialize(self, **kwargs):
super(AverageArraysApp, self).initialize(**kwargs)
Expand All @@ -268,9 +272,9 @@ def run(self):
if len(outs) < 1:
raise Exception("At least one output should have been added to %r" % self)
self.getInputArrays()
avg = self.averageArray()
self._avg = self.averageArray()
for o in outs:
d = pickle.dumps(avg)
d = pickle.dumps(self._avg)
o.len = len(d)
o.write(d) # average across inputs

Expand All @@ -283,8 +287,14 @@ def getInputArrays(self):
ins = self.inputs
if len(ins) < 1:
raise Exception("At least one input should have been added to %r" % self)

marray = [pickle.loads(droputils.allDropContents(inp)) for inp in ins]
marray = []
for inp in ins:
sarray = droputils.allDropContents(inp)
if len(sarray) == 0:
print(f"Input does not contain data!")
else:
sarray = pickle.loads(sarray)
marray.extend(sarray)
self.marray = marray

def averageArray(self):
Expand Down Expand Up @@ -462,3 +472,66 @@ def run(self):

def condition(self):
return self.result



##
# @brief ListAppendThrashingApp\n
# @details A testing APP that appends a random integer to a list num times.
# This is a CPU intensive operation and can thus be used to provide a test for application threading
# since this operation will not yield.
# The resulting array will be sent to all connected output apps.
# @par EAGLE_START
# @param gitrepo $(GIT_REPO)
# @param version $(PROJECT_VERSION)
# @param category PythonApp
# @param[in] param/size/100/Integer/readwrite
# \~English the size of the array\n
# @param[in] param/appclass/dlg.apps.simple.ListAppendThrashingApp/String/readonly
# \~English Application class\n
# @param[out] port/array
# \~English Port carrying the random array.
# @par EAGLE_END
class ListAppendThrashingApp(BarrierAppDROP):
"""
A BarrierAppDrop that appends random integers to a list N times. It does
not require any inputs and writes the generated array to all of its
outputs.

Keywords:

size: int, number of array elements
"""
compontent_meta = dlg_component('ListAppendThrashingApp', 'List Append Thrashing',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def initialize(self, **kwargs):
self.size = self._getArg(kwargs, 'size', 100)
self.marray = []
super(ListAppendThrashingApp, self).initialize(**kwargs)

def run(self):
# At least one output should have been added
outs = self.outputs
if len(outs) < 1:
raise Exception(
'At least one output should have been added to %r' % self)
self.marray = self.generateArray()
for o in outs:
d = pickle.dumps(self.marray)
o.len = len(d)
o.write(pickle.dumps(self.marray))

def generateArray(self):
# This operation is wasteful to simulate an N^2 operation.
marray = []
for _ in range(int(self.size)):
marray = []
for i in range(int(self.size)):
marray.append(random.random())
return marray

def _getArray(self):
return self.marray
29 changes: 27 additions & 2 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import threading
import time
import re
import sys
import inspect
import binascii

Expand Down Expand Up @@ -67,7 +68,12 @@
PlasmaIO,
PlasmaFlightIO,
)
if sys.version_info >= (3, 8):
from .io import SharedMemoryIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
from .meta import dlg_float_param, dlg_int_param, dlg_list_param, \
dlg_string_param, dlg_bool_param, dlg_dict_param
from dlg.process import DlgProcess
from .meta import (
dlg_float_param,
dlg_int_param,
Expand Down Expand Up @@ -634,6 +640,15 @@ def checksum(self):

:see: `self.checksumType`
"""
if self.status == DROPStates.COMPLETED and self._checksum is None:
# Generate on the fly
io = self.getIO()
io.open(OpenMode.OPEN_READ)
data = io.read(4096)
while data is not None and len(data) > 0:
self._updateChecksum(data)
data = io.read(4096)
io.close()
return self._checksum

@checksum.setter
Expand Down Expand Up @@ -1416,7 +1431,10 @@ def initialize(self, **kwargs):
self._buf = io.BytesIO(*args)

def getIO(self):
return MemoryIO(self._buf)
if hasattr(self, '_tp') and hasattr(self, '_sessID') and sys.version_info >= (3, 8):
return SharedMemoryIO(self.oid, self._sessID)
else:
return MemoryIO(self._buf)

@property
def dataURL(self):
Expand Down Expand Up @@ -1986,7 +2004,14 @@ def execute(self, _send_notifications=True):
self.execStatus = AppDROPStates.RUNNING
while tries < self.n_tries:
try:
self.run()
if hasattr(self, '_tp'):
proc = DlgProcess(target=self.run, daemon=True)
proc.start()
proc.join()
if proc.exception:
raise proc.exception
else:
self.run()
if self.execStatus == AppDROPStates.CANCELLED:
return
self.execStatus = AppDROPStates.FINISHED
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def copyDropContents(source, target, bufsize=4096):
target.write(buf)
logger.debug(f"Wrote {len(buf)} bytes to {repr(target)}")
buf = source.read(desc, bufsize)
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
if buf is not None:
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
source.close(desc)


Expand Down
69 changes: 67 additions & 2 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import io
import logging
import os
import sys
import urllib.parse

from abc import abstractmethod, ABCMeta
from typing import Optional, Union

from . import ngaslite
from .apps.plasmaflight import PlasmaFlightClient

import pyarrow
import pyarrow.plasma as plasma
if sys.version_info >= (3, 8):
from dlg.shared_memory import DlgSharedMemory


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -219,9 +222,10 @@ class MemoryIO(DataIO):
A DataIO class that reads/write from/into the BytesIO object given at
construction time
"""
_desc: io.BytesIO
_desc: io.BytesIO # TODO: This might actually be a problem

def __init__(self, buf: io.BytesIO, **kwargs):
super().__init__()
self._buf = buf

def _open(self, **kwargs):
Expand Down Expand Up @@ -257,9 +261,70 @@ def delete(self):

@overrides
def buffer(self) -> memoryview:
# TODO: This may also be an issue
return self._open().getbuffer()



class SharedMemoryIO(DataIO):
"""
A DataIO class that writes to a shared memory buffer
"""

def __init__(self, uid, session_id, **kwargs):
super().__init__()
self._name = f'{session_id}_{uid}'
self._written = 0
self._pos = 0
self._buf = None

def _open(self, **kwargs):
self._pos = 0
if self._buf is None:
if self._mode == OpenMode.OPEN_WRITE:
self._written = 0
self._buf = DlgSharedMemory(self._name)
return self._buf

def _write(self, data, **kwargs):
total_size = len(data) + self._written
if total_size > self._buf.size:
self._buf.resize(total_size)
self._buf.buf[self._written:total_size] = data
self._written = total_size
else:
self._buf.buf[self._written:total_size] = data
self._written = total_size
self._buf.resize(total_size)
"""
It may be inefficient to resize many times, but assuming data is written 'once' this is
might be tolerable and guarantees that the size of the underlying buffer is tight.
"""
return len(data)

def _read(self, count=4096, **kwargs):
if self._pos == self._buf.size:
return None
start = self._pos
end = self._pos + count
end = min(end, self._buf.size)
out = self._buf.buf[start:end]
self._pos = end
return out

def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
self._buf.resize(self._written)
self._buf.close()
self._buf = None

def exists(self):
return self._buf is not None

def delete(self):
self._close()


class FileIO(DataIO):
_desc: io.BufferedReader
def __init__(self, filename, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def dlgNM(parser, args):
action="store",
type="int",
dest="max_threads",
help="Max thread pool size used for executing drops. 0 (default) means no pool.",
help="Max thread pool size used for executing drops. -1 means use all CPUs. 0 (default) means no threads.",
default=0,
)
(options, args) = parser.parse_args(args)
Expand Down
Loading