diff --git a/ipc_benchmark.py b/ipc_benchmark.py index 24b962f..5c24bde 100755 --- a/ipc_benchmark.py +++ b/ipc_benchmark.py @@ -103,6 +103,7 @@ def remove(self): self.ptr = None if self.shmid and self.is_creator: shmctl(self.shmid, IPC_RMID, 0) + print("Removed shmid %i" % self.shmid) self.shmid = None def __del__(self): @@ -124,8 +125,9 @@ def next_power_of_two(n): class SharedNumpyArray: # cls members - server_instances = set() - extra_space = 4048 + ServerInstances = set() + MaxServerInstances = 2 + ExtraSpaceBytes = 4048 # local members is_server = False mem = None @@ -135,7 +137,7 @@ class SharedNumpyArray: @classmethod def needed_mem_size(cls, shape, typestr): itemsize = int(typestr[2:]) - mem_size = cls.extra_space + itemsize * numpy.prod(shape) + mem_size = cls.ExtraSpaceBytes + itemsize * numpy.prod(shape) return mem_size @classmethod @@ -153,10 +155,11 @@ def create_copy(cls, array): @classmethod def create_new(cls, shape, strides, typestr): needed_mem_size = cls.needed_mem_size(shape=shape, typestr=typestr) - for inst in cls.server_instances: + for inst in cls.ServerInstances: assert isinstance(inst, SharedNumpyArray) if inst.is_in_use(): continue - if inst.mem.size < needed_mem_size: continue + if inst.mem.size < needed_mem_size: + inst._init_mem(shape=shape, typestr=typestr) inst._set_is_used(1) inst._create_numpy(shape=shape, strides=strides, typestr=typestr) return inst @@ -168,11 +171,11 @@ def create_from_shared(cls, shape, strides, typestr, mem): def __init__(self, shape, strides, typestr, mem=None): if not mem: + assert len(self.ServerInstances) < self.MaxServerInstances self.is_server = True - mem_size = next_power_of_two(self.needed_mem_size(shape=shape, typestr=typestr)) - self.mem = SharedMem(size=mem_size) + self._init_mem(shape=shape, typestr=typestr) self._set_is_used(1) - self.server_instances.add(self) + self.ServerInstances.add(self) else: self.is_server = False mem_size = self.needed_mem_size(shape=shape, typestr=typestr) @@ -183,6 +186,14 @@ def __init__(self, shape, strides, typestr, mem=None): self.mem = mem self._create_numpy(shape=shape, strides=strides, typestr=typestr) + def _init_mem(self, shape, typestr): + assert self.is_server + if self.mem: + self.mem.remove() + self.mem = None + mem_size = next_power_of_two(self.needed_mem_size(shape=shape, typestr=typestr)) + self.mem = SharedMem(size=mem_size) + def _create_numpy(self, shape, strides, typestr): assert self.mem.ptr > 0 self.shape = shape @@ -190,7 +201,7 @@ def _create_numpy(self, shape, strides, typestr): self.typestr = typestr # http://docs.scipy.org/doc/numpy/reference/arrays.interface.html array_intf = { - "data": (self.mem.ptr + self.extra_space, False), + "data": (self.mem.ptr + self.ExtraSpaceBytes, False), "shape": shape, "strides": strides, 'typestr': typestr, @@ -296,6 +307,8 @@ def demo(): assert isinstance(m2, numpy.ndarray) assert m.shape == m2.shape assert numpy.isclose(m, m2).all() + del m2 + gc.collect() print("Copying done, exiting.") pickle(p.stdin, ("exit",)) out, = unpickle(p.stdout) @@ -316,11 +329,12 @@ def demo_client(): pickle(out_stream, ("exit",)) break elif cmd[0] == "ping": - a = cmd[1] - assert isinstance(a, numpy.ndarray) - pickle(out_stream, ("pong", a)) + assert isinstance(cmd[1], numpy.ndarray) + pickle(out_stream, ("pong", cmd[1])) else: assert False, "unknown: %r" % cmd + del cmd + gc.collect() print("Exit from client!")