## Goal net Architecture sandbox
Created: 12/02/19 Danil Lykov



This notebook is a minimal implementation of this scheme.

The goal of this sandbox notebook is to start a skeleton for the project and set up all 
interfaces that are to be extended with additional modules.

Minimal versions of modules are to be provided
![img](./res/schem.png)



Dir structure of project may be following:

    core/
        MUX/
        DMS/
        Modules/
            echo/
            pomodoro/
            brain/
            logger/
    connectors/
        web/
        browser/
        linux/
        telegram/
        External/
    

In [62]:
import multiprocessing as prc
import multiprocessing.dummy as thr

import zmq, os, time
import json
from pprint import pprint 

In [96]:
class NetworkConfig:
    """
    This class stores config and lets process get network addresses to connect
    TODO:
    generate personalized configs to take into account whether processes run on the same machine
    """
    def __init__(self, conf={}):
        self.conf = conf
        
    def add_node(self,name,address):
        self.conf[name] = address
        
    def get_address(self,name):
        return self.conf[name]
    
def gen_network_config(target_process=None):
    base_addr = 'tcp://127.0.0.1:'
    CONFIG = {
        'MUX_in':base_addr + '9000',
        'MUX_out':'tcp://127.0.0.1:'+ '9001',
        'DMX':base_addr + '9002',
        'DB':base_addr + '9003',
        'console':base_addr + '9101',
        'logger':base_addr + '9011',
        'echo':base_addr + '9012',
    }
    netconf = NetworkConfig(CONFIG)
    return netconf
    

In [143]:

def filter_action(action):
    """
    check if specified connector has rights to send actions
    on the behaf of a user
    check app id of connector
    """
    if not action.get('appid'):
        return None
    if not action.get('user_id'):
        return None
    else:
        return action

def MUX(my_name, network_config):
    print("MUX is listening  named as:%s"%(my_name))
    source_addr = network_config.get_address(my_name+'_in')
    sink_addr = network_config.get_address(my_name+'_out')
    ###> Prepare the zmq sockets
    ctx = zmq.Context()
    source = ctx.socket(zmq.PULL)
    print("Binding MUX in to",source_addr)
    source.bind( source_addr )
    #----
    sink = ctx.socket(zmq.PUB)
    print("Binding MUX out ",sink_addr)
    sink.bind( sink_addr )
    ###<
    while True:
        action = source.recv_json()
        print("MUX in action:")
        print(action)
        action = filter_action(action)
        if action:
            print("MUX out")
            sink.send_string(themify('module',action))
        else:
            print("action forbidden")


In [144]:
class ConsoleConnector:
    """
    A simple connector that sends text messages on behaf of user with id 1
    
    """
    def __init__(self,netconf,name='console'):
        self.sink = self._get_mux_socket(netconf)
        self.source = self._get_source_socket(name,netconf)
        
        self.appid = '1'
    def _get_source_socket(self, name, netconf):
        my_addr = netconf.get_address(name)
        ctx = zmq.Context()
        s = ctx.socket(zmq.REP)
        print("Console connector is listening at %s named %s"%(my_addr,name))
        s.bind(my_addr)
        return s
        
    def _get_mux_socket(self,netconf):
        ctx = zmq.Context()
        s = ctx.socket(zmq.PUSH)
        s.connect(netconf.get_address("MUX_in"))
        return s
    
    def start(self,pipe):
        #text,notif =None,None
        def listen_for_notif():
            print("console connector listening for notification...")
            while True:
                notif = self.source.recv_json()
                print('console got notif:',notif)
                self.source.send_string("CONSOLE OK")
        #listener = thr.Process(target=listen_for_input)
        notificator = thr.Process(target=listen_for_notif)
        #listener.start()
        notificator.start()
        while True:
            text =pipe.recv()
            self.send(text)
            time.sleep(0.01)
        
    def send(self,text):
        msg = {
            'text':text,
            'user_id':1,
            'appid':self.appid
        }
        
        self.sink.send_json(msg)
        
            
def launch_console_connector(name,netconf,pipe):
    consonn= ConsoleConnector(netconf,name=name)
    consonn.start(pipe)

In [145]:
def themify(topic,msg):
    """ json encode the message and prepend the topic """
    return topic + ' ' + json.dumps(msg)

def dethemify(topicmsg):
    """ Inverse of themify() """
    json0 = topicmsg.find('{')
    topic = topicmsg[0:json0].strip()
    msg = json.loads(topicmsg[json0:])
    return topic, msg 

class EchoModule:
    """
    A simple module that passes everything unchanged as notification
    """
    def __init__(self, netconf,name='echo'):
        #TODO: make a base class of module that will use netconf to get provider
        self.source = self._get_mux_socket(netconf)
        self.sink = self._get_dmx_socket(netconf)
        
    def _get_dmx_socket(self,netconf):
        ctx = zmq.Context()
        s = ctx.socket(zmq.PUSH)
        s.connect(netconf.get_address("DMX"))
        return s
    def _get_mux_socket_pull(self,netconf):
        ctx = zmq.Context()
        s = ctx.socket(zmq.PULL)
        s.connect(netconf.get_address("MUX_out"))
        return s
    def _get_mux_socket(self,netconf):
        ctx = zmq.Context()
        s = ctx.socket(zmq.SUB)
        s.setsockopt(zmq.SUBSCRIBE, b'')
        s.connect(netconf.get_address("MUX_out"))
        return s
    
    def _recv(self):
        raw = self.source.recv_string()
        topic,msg = dethemify(raw)
        return msg
        
    def start(self):
        while True:
            msg = self._recv()
            print("echo got action")
            self.sink.send_json(msg)
            
def launch_echo(name,netconf):
    echom = EchoModule(netconf,name=name)
    echom.start()
            
class LoggerModule:
    """
    A simple module that passes everything unchanged as notification
    """
    def __init__(self, netconf,name='logger'):
        #TODO: make a base class of module that will use netconf to get provider
        self.source = self._get_mux_socket(netconf)
        self.sink = self._get_dmx_socket(netconf)
        
    def _get_dmx_socket(self,netconf):
        ctx = zmq.Context()
        s = ctx.socket(zmq.PUSH)
        s.connect(netconf.get_address("DMX"))
        return s
    def _recv(self):
        raw = self.source.recv_string()
        topic,msg = dethemify(raw)
        return msg
    def _get_mux_socket(self,netconf):
        ctx = zmq.Context()
        s = ctx.socket(zmq.SUB)
        s.setsockopt(zmq.SUBSCRIBE, b'')
        s.connect(netconf.get_address("MUX_out"))
        return s
        
    def start(self):
        while True:
            msg = self._recv()
            print("Logger got action")
            print("log",msg)
            
def launch_logger(name,netconf):
    logm = LoggerModule(netconf,name=name)
    logm.start()

In [146]:
def DMX(my_name, network_config):
    print("DMX is listening named as:%s"%my_name)
    source_addr = network_config.get_address(my_name)
    # But maybe use PUB-SUB for output ??
    ###> Prepare the zmq sockets
    ctx = zmq.Context()
    source = ctx.socket(zmq.PULL)
    print("Binding DMX pull to",source_addr)
    source.bind( source_addr )
    #-----
    connectors  = ['console']
    conn_addr = [(conn, network_config.get_address(conn)) for conn in connectors ]
    sockets = {}
    for name, addr in conn_addr:
        sock = ctx.socket(zmq.REQ)
        sock.connect(addr)
        sockets[name] = sock
    ###<
    while True:
        notif = source.recv_json()
        print("DMX in:")
        print(notif)
        for name,socket in sockets.items():
            print("Sending notif to %s connector"%name)
            socket.send_json(notif)
        for name,socket in sockets.items():
            print("getting answer %s connector"%name)
            answer = socket.recv_string()
            print('dmx %s answ'%name,answer)

In [147]:

def main():
    netconf = gen_network_config()
    inp,out = prc.Pipe()
    
    mux = prc.Process(target=MUX, args=('MUX',netconf),name='mux')
    dmx = prc.Process(target=DMX, args=('DMX',netconf),name='dmx')
    logm = prc.Process(target=launch_logger, args=('logger',netconf),name='log')
    echom = prc.Process(target=launch_echo, args=('echo',netconf),name='echo')
    console = prc.Process(target=launch_console_connector, args=('console',netconf,out),name='console')
    
    print("Goalnet sandbox is starting...")
    try:
        mux.start()
        dmx.start()
        logm.start()
        echom.start()
        console.start()
        print("Console pid is:",console.pid)
        while True:
            inp_text = input('Action message:')
            inp.send(inp_text)
        
    except Exception as e:
        print("Exception, terminating all")
        mux.terminate()
        dmx.terminate()
        logm.terminate()
        echom.terminate()
        console.terminate()
        raise e
    
main()

Goalnet sandbox is starting...
MUX is listening  named as:MUX
Binding MUX in to tcp://127.0.0.1:9000
Binding MUX out  tcp://127.0.0.1:9001
Console pid is: 5098
DMX is listening named as:DMX
Binding DMX pull to tcp://127.0.0.1:9002
Console connector is listening at tcp://127.0.0.1:9101 named console
console connector listening for notification...
Action message:asd
MUX in action:
{'text': 'asd', 'user_id': 1, 'appid': '1'}
MUX out
Logger got action
echo got action
log {'text': 'asd', 'user_id': 1, 'appid': '1'}
DMX in:
{'text': 'asd', 'user_id': 1, 'appid': '1'}
Sending notif to console connector
getting answer console connector
console got notif: {'text': 'asd', 'user_id': 1, 'appid': '1'}
dmx console answ CONSOLE OK
Action message:kfalj;s
MUX in action:
{'text': 'kfalj;s', 'user_id': 1, 'appid': '1'}
MUX out
echo got action
Logger got action
log {'text': 'kfalj;s', 'user_id': 1, 'appid': '1'}
DMX in:
{'text': 'kfalj;s', 'user_id': 1, 'appid': '1'}
Sending notif to console connector
ge

Process mux:
Process echo:
Process console:
Process dmx:
Traceback (most recent call last):
Process log:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/p

KeyboardInterrupt: 

  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-144-b9eb48375bce>", line 54, in launch_console_connector
    consonn.start(pipe)
  File "<ipython-input-144-b9eb48375bce>", line 38, in start
    text =pipe.recv()
  File "<ipython-input-143-829a2423a765>", line 30, in MUX
    action = source.recv_json()
  File "<ipython-input-146-6926075df5d6>", line 20, in DMX
    notif = source.recv_json()
  File "/usr/local/lib/python3.6/dist-packages/zmq/sugar/socket.py", line 665, in recv_json
    msg = self.recv(flags)
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/zmq/sugar/socket.py", line 665, in recv_json
    msg = self.recv(flags)
  File "<ipython-input-145-ce010ebf9ba1>", line 51, in launch_echo
    echom.start()
  File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.s

In [None]:
def inp():
    text = input()
    print(text)
    
p = prc.Process(target=inp)
p.start()
p.pid

In [None]:
p.terminate()