Skip to content

MCCIServerOperation

Toby Schneider edited this page May 26, 2016 · 3 revisions

MCCI: Server Operation

The MCCI server is the broker of all inter-client communication, including cases of inter-node communication.

Bootup

TBD: the order of these steps

  1. The server configuration is loaded

    1. Node Address
    2. Soft limit on clients' requests timeouts (must be communicated on client-server handshake)
    3. Retention timeout for foreign data (how long the server caches incoming foreign data -- may not be necessary, and 10 seconds would be on the high side of this value)
    4. Distribution Schema to use
    5. Distribution Policy to use
  2. The Distribution Schema is loaded

    1. the SHA-1 of the distribution schema is calculated
  3. The Distribution Policy is loaded

Client Connection

TBD

Accepting and Processing Requests

Without outstanding requests, the server does nothing; it would ignore every data packet sent to it.

Note: the following algorithm might be better implemented as threads and scheduling, but it is presented as iteration in python for simple if/then/else clarity

def init():
    # self.network_address is the network address
    # self.unfulfilled_requests is an array of request packets
    # self.already_forwarded is the set of request packets that have been forwarded
    # self.latest is a 2-dimensional array of revisions, keyed on node_address and variable_id
    # self.memory is an array of data packets, keyed on variable_id
    # self.deliver_matching_packets()   is really handled by 0mq publish-subscribe
    # self.forward_request_to_clients() is handled by 0mq request-response(?)
    # self.respond_to_production()      is handled by 0mq request-response
def accept_production_packet(production_packet):
    # convert into data packet, store in memory, and forward
    new_revision = self.latest[0][production_packet.variable_id] + 1
    if 0 != production_packet.response_id:
        self.respond_to_production(production_packet.response_id, new_revision)
    data_packet = DataPacket()
    data_packet.network_address = self.network_address
    data_packet.variable_id     = production_packet.variable_id
    data_packet.revision        = new_revision
    data_packet.payload         = production_packet.payload
    self.memory[data_packet.variable_id] = data_packet
    self.latest[0][data_packet.variable_id] = new_revision
    self.process_requests()
def accept_data_packet(data_packet):
    # just record the latest revision and pass it along
    local_rev = self.latest[data_packet.network_address][data_packet.variable_id]
    new_rev   = max(local_rev, data_packet.revision)
    self.latest[data_packet.network_address][data_packet.variable_id] = new_rev
    self.process_requests()  # with the new packet, somehow
def accept_request_packet(request_packet):
    # TBD: make sure request_packet.timeout is within allowable limits
    self.unfulfilled_requests.append(request_packet)
def process_requests():
    for request_packet in self.unfulfilled_requests:
        if is_elapsed(request_packet.timeout):
            # done with this request
            self.unfulfilled_requests.remove(request_packet)
            self.already_forwarded.remove(request_packet)
        elif request_packet.network_address not in self.latest:
            # do nothing -- keep waiting until the timeout just in case it shows up
        elif request_packet.variable_id not in self.latest[request_packet.network_address]:
            # do nothing -- keep waiting until the timeout just in case it shows up
        elif -1 == request_packet.node_address or 0 == request_packet.variable_id:
            # wildcard: return any matching packets to the client that sent this request
            self.deliver_matching_packets(request_packet)
            if request_packet in self.already_forwarded: raise Exception
        elif 0 == request_packet.node_address and \
             (0 == request_packet.revision or \
              request_packet.revision == self.latest[0][request_packet.variable_id]):
            # this means they requested the latest revision of locally-published data
            # the server has this data in memory. return to client that sent this request
            self.deliver_matching_packets(request_packet)
            self.unfulfilled_requests.remove(request_packet) # done
            if request_packet in self.already_forwarded: raise Exception
        elif data_available: # if the data from another host has been delivered to this server
            self.deliver_matching_packets(request_packet) # send to the client that requested
            self.unfulfilled_requests.remove(request_packet) # done
            self.already_forwarded.remove(request_packet)
        elif request_packet not in self.already_forwarded:            
            self.forward_request_to_clients(request_packet)
            self.already_forwarded.append(request_packet)
        else:
            # the data was requested but it has not arrived yet, so wait

Server-published Variables

The server publishes several variables that describe its own operation. All of these might be expanded to a set of time-varied data (e.g. last minute, last 5 minutes, last 10 minutes, all time, etc)

  1. The list of clients connected
  2. The list of local variables published
  3. Map of clients to the variables they publish
  4. Map of clients to the variables they request
  5. Map of variables to the clients that have published them
  6. Map of variables to the clients that have requested them