In [173]:
pserver.stop()

Closed command server
Closed parameter server


In [174]:
from parameter_server import *
pserver = ParameterServer()

pserver.add_listener("127.0.0.1", 9010)

Parameter server started on ('127.0.0.1', 9011)
Starting on port 14000
Serving on ('127.0.0.1', 14000)
Registering listener
Unhandled command [/requestListenerInfo] ~ 16987


In [175]:
import json

class DiskBufferWriter(object):
    def __init__(self, name, filename, path):
        self._data = None
        self.name:str = name
        self._filename:str = filename
        self._path:str = path
        
        self._cache_size:int  = -1
        self._cache_counter: int = 0
        
        self._file_lock:bool = False
        pass
    
    def allow_cache(self, cache_size: int = 0):
        # 0 is unlimited cache
        self._cache_size = cache_size
    
    def use_file_lock(self, use:bool = True):
        print("File lock not implemented yet")
        self._file_lock = use
    
    @property
    def data(self):
        return self._data

    @data.setter
    def data(self, data):
        self._data = data
        
        # TODO implement file lock
        
        outname = self._filename
        if self._cache_size >=0:
            try:
                index_dot = outname.index('.')
                prefix = outname[0: index_dot]
                suffix = outname[index_dot:]
            except:
                prefix = outname
                suffix = ''
            if self._cache_size > 0 and self._cache_counter == self._cache_size:
                self._cache_counter = 0
            outname = prefix + '_' + str(self._cache_counter) + suffix
            self._cache_counter += 1
        
        with open(self._path + outname, 'w') as outfile:
            json.dump(data, outfile)
        
        for l in self.pserver.listeners:
            address = "/__DiskBuffer/" + self.name
            
            print('sending ' + address)
            print(l.__dict__)
            l.send_message(address, outname)
            
    def expose_to_network(self, pserver: ParameterServer):
        self.pserver = pserver

In [176]:
dataWriter = DiskBufferWriter('sine_data','output.json', 'bin/')
dataWriter.expose_to_network(pserver)


In [177]:
import random, math

numpoints = random.randint(4,20)

rnums = [random.random() for i in range(15)]
rnums.sort()
dataWriter.data = {'random' : [-1.0 + (n* 2.0) for n in rnums],
                   'sines' : [math.sin(r* 2 * math.pi) for r in rnums]
                  }

sending /__DiskBuffer/sine_data
{'_sock': <socket.socket fd=70, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('0.0.0.0', 0)>, '_address': '127.0.0.1', '_port': 9010}


In [130]:
dataWriter.data

{'random': [-0.8366654232642539,
  -0.8331180749841103,
  -0.7790644087513241,
  -0.7686759558364216,
  -0.7045546373219553,
  -0.4930698661895523,
  -0.05864524091612289,
  0.38753981787558134,
  0.4546406438625832,
  0.5457110947082044,
  0.5751272361207138,
  0.6158246132401255,
  0.6566437794149007,
  0.8976172962215518,
  0.9437203944664998],
 'sines': [0.49090715680244246,
  0.5005855388109317,
  0.6396859615066561,
  0.6644263027839237,
  0.8005239523035943,
  0.9997630068271156,
  0.1831989161282138,
  -0.9382346248995734,
  -0.9898639563127314,
  -0.9897064184500788,
  -0.9722765289887373,
  -0.9345251813381559,
  -0.8813374273313281,
  -0.3161273894257766,
  -0.17588783923844695]}

In [131]:
import time
for i in range(20):
    numpoints = random.randint(4,20)

    rnums = [random.random() for i in range(15)]
    rnums.sort()
    dataWriter.data = {'random' : [-1.0 + (n* 2.0) for n in rnums],
                       'sines' : [math.sin(r* 2 * math.pi) for r in rnums]
                      }
    time.sleep(0.1)

sending /__DiskBuffer/sine_data
{'_sock': <socket.socket fd=66, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('0.0.0.0', 63148)>, '_address': '127.0.0.1', '_port': 9010}
sending /__DiskBuffer/sine_data
{'_sock': <socket.socket fd=66, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('0.0.0.0', 63148)>, '_address': '127.0.0.1', '_port': 9010}
sending /__DiskBuffer/sine_data
{'_sock': <socket.socket fd=66, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('0.0.0.0', 63148)>, '_address': '127.0.0.1', '_port': 9010}
sending /__DiskBuffer/sine_data
{'_sock': <socket.socket fd=66, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('0.0.0.0', 63148)>, '_address': '127.0.0.1', '_port': 9010}
sending /__DiskBuffer/sine_data
{'_sock': <socket.socket fd=66, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('0.0.0.0', 63148)>, '_address': '127.0.0.1', '_port': 9010}
sending /_

In [178]:
dataWriter2 = DiskBufferWriter('background','background.json', 'bin/')
dataWriter2.expose_to_network(pserver)
dataWriter2.allow_cache(10)

In [271]:
import random, math

dataWriter2.data = [random.random() for i in range(3)]

sending /__DiskBuffer/background
{'_sock': <socket.socket fd=70, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('0.0.0.0', 56154)>, '_address': '127.0.0.1', '_port': 9010}


In [120]:
pserver.stop()

Closed command server
Closed parameter server
