# NoPE - Connectivity Manager

The NoPE-Dispatcher uses one `ConnectivityManager`. The manager observes the connection and remotly connected dispatchers (and their `ConnectivityManager`). The Manager detects newly connected dispatchers and disconnected dispatchers. Additionally, it sends a StatusMessage (in the form of `INopeStatusInfo`). This status message is interpreted as heartbeat. The `ConnectivityManager` checks those heartbeats with a defined interval. If a specific amount of time is ellapsed, the remote dispatcher is marked as `slow` -> `warning` -> `dead`. After an additional delay in the state `dead` the dispatcher is altough removed.

## Master

Defaultly a `ConnectivityManager` is elected as `master`. The master is defined as the `ConnectivityManager` with the highest `upTime`. 

> Alternativly a master can be forced.

## Synchronizing time

Because we asume, that **NoPE** is running on different computing nodes, we have to be able to synchronize the time between those elements. Therefore the `ConnectivityManager` is able to sync the time (by providing a `timestamp` and an additional `delay` that was needed to get to the call (for instance `ping / 2`))


In [1]:
# First lets install nope using npm
import nope
import asyncio

# Create a communicator:
# We will use the event layer (which just runs internally)
communicator = await nope.getLayer("event")

# Lets create our dispatcher

# 1. Dispatcher simulates our local system
localDispatcher = nope.dispatcher.getDispatcher({
  "communicator":communicator,
  "id": "local"
}, {
  "singleton": False,
  "useBaseServices": False
})

> For Jupyter we need an extra async wrapper to wait for initalizing the dispatcher:

see here for the details in Jupyter: https:#n-riesco.github.io/ijavascript/doc/async.ipynb.html

In [2]:
# Lets wait for our element to be ready.
await localDispatcher.ready.waitFor()

True

Now we want to listen to newly connected dispatchers. For this purpose, we create an observer, which will listen to changes.

In [3]:
# Subscribe to changes
def onChange(data, rest):
  # Log the changes
  print("onChange - listener")
  print("\tadded   =", data.added)
  print("\tremoved =", data.removed)


observer = localDispatcher.connectivityManager.dispatchers.onChange.subscribe(onChange)

Additionally we want to show the currently connected dispatchers. In this data the own dispatcher will **allways** be included:

In [4]:
# Show our connected Dispatchers
connectedDispatchers = localDispatcher.connectivityManager.dispatchers.data.getContent()
localDispatcherIncluded = localDispatcher.id in connectedDispatchers

# Now lets log our results.
print("connectedDispatchers    =", connectedDispatchers)
print("localDispatcherIncluded =", localDispatcherIncluded)

connectedDispatchers    = []
localDispatcherIncluded = False


Now that we have implemented our listeners and have seen the connected dispatchers (which is only the `"local"`-dispatchre), We will add an additional dispatcher. This should result in calling our `onChange`-listener. Additionally, we wait until our `remoteDispatcher` is initalized

In [5]:
# 2. Dispatcher simulates our remote system
remoteDispatcher = nope.dispatcher.getDispatcher({
  "communicator": communicator,
  "id": "remote"
}, {
  "singleton": False,
  "useBaseServices": False
})


Now we want to see, which system is the current master. This should be our `local`.

In [6]:
# We expect to be the master, because the localDispatcher has been created first.
print("master =", localDispatcher.connectivityManager.master.id)

onChange - listener
	added   = ['local', 'remote']
	removed = []
master = local


We can now force the remote dispatcher to be our master, by setting the master. (For this purpose we can later use a base service ==> then we just have to call the service) 

In [7]:
remoteDispatcher.connectivityManager.isMaster = True
localDispatcher.connectivityManager.isMaster = False


await asyncio.sleep(5)

In [8]:
# We expect the master to be the remote.
print("master =", localDispatcher.connectivityManager.master.id)
print("master-info =", localDispatcher.connectivityManager.master)

master = remote
master-info = {'id': 'remote', 'env': 'python', 'version': '1.4.1', 'isMaster': True, 'isMasterForced': True, 'host': {'cores': 8, 'cpu': {'model': 'Intel64 Family 6 Model 142 Stepping 12, GenuineIntel', 'speed': 1992.0, 'usage': 20.3}, 'os': 'Windows 10', 'ram': {'usedPerc': 22.8, 'free': 25058, 'total': 32442}, 'name': 'nz-078'}, 'pid': 14932, 'timestamp': 1667854255621, 'connectedSince': 1667854250554, 'status': 0}


Now lets see what happens if we adapt the heartbeat intervall of our *local* instance. We want to receive every 50 ms a heartbeat:

In [9]:


def renderStatus():
  print("master-info =", localDispatcher.connectivityManager.master.status)


nope.EXECUTOR.setTimeout(renderStatus, 50)
nope.EXECUTOR.setTimeout(renderStatus, 750)
nope.EXECUTOR.setTimeout(renderStatus, 1500)
nope.EXECUTOR.setTimeout(renderStatus, 2500)


await localDispatcher.connectivityManager.setTimings({
  # our system will send every 50 ms an heartbeat.
  "sendAliveInterval": 250,
  # we will check that after
  "checkInterval": 125,
  # will mark dispatchers as slow after not receiving heartbeats for 50ms
  "slow": 500,
  # we will mark  dispatchers with a warning flag after 50 ms
  "warn": 1000,
  # we mark it as dead after 0.5 s
  "dead": 2000,
  # We will remove the dispatcher after 1 s
  "remove": 3000,
})

await remoteDispatcher.connectivityManager.setTimings({
  # our system will send every 50 ms an heartbeat.
  "sendAliveInterval": 5000,
})

async def resetTiming():
  await localDispatcher.connectivityManager.setTimings({})
  await remoteDispatcher.connectivityManager.setTimings({})

# We reset the timeouts.
nope.EXECUTOR.setTimeout(resetTiming, 3000)

await asyncio.sleep(5)


master-info = 0
master-info = ENopeDispatcherStatus.SLOW
master-info = ENopeDispatcherStatus.DEAD
