In [1]:
# -*- coding: utf-8 -*-
"""Master controller main program.

1. Creates a Resource Manager object.
   :class:`sip_common.resource_manager.ResourceManager`..
2. Starts the logger (log aggregator) as a subprocess.
3. Loads the slave configuration JSON file (``slave_map.json``).
4. Creates the Master Controller state machine:
   :class:`sip_master.master_states.MasterControllerSM`.
5. Creates the Master Controller RPC interface.
   :class:`sip_master.rpc_service.RpcService`.
6. Enters a event loop waiting for command line inputs.

The master controller implements a simple state machine (see
`state machine uml`_).

.. _state machine uml: https://goo.gl/Xyri5Q
"""

import sys
import json
import threading
import subprocess
import os
import time
import logging.handlers
from rpyc.utils.server import ThreadedServer

In [2]:
# Export environment variable SIP_HOSTNAME
# This is needed before the other SIP imports.
os.environ['SIP_HOSTNAME'] = os.uname()[1]

In [3]:
from sip.common.resource_manager import ResourceManager
from sip.common.docker_paas import DockerPaas as Paas
from sip.master import config

In [4]:
__author__ = 'David Terrett + Brian McIlwrath'

In [5]:
sip_root = os.path.join(os.getcwd(),'sip/')
config_file = os.path.join(sip_root, 'etc', 'slave_map.json')
resources_file = os.path.join(sip_root, 'etc', 'resources.json')

In [6]:
# Create the resource manager
with open(resources_file) as f:
    _resources = json.load(f)
    # If using localhost, and sip root is set to #cwd replace it.
    # if 'localhost' in _resources and \
    #                 _resources['localhost']['sip_root'] == '#cwd':
    #     _resources['localhost']['sip_root'] = os.getcwd()
    # FIXME(FD) Check this is an acceptable change.
    if 'localhost' in _resources:
        _resources['localhost']['sip_root'] = sip_root
    print('Resource table:')
    for i, resource in enumerate(_resources):
        print('[{:03d}] {}'.format(i, resource))
        for key, value in _resources[resource].items():
            print('  - {} {}'.format(key, value))
    config.resource = ResourceManager(_resources)

# Start logging server
    paas = Paas()
    config.logserver = paas.run_service('logging_server', 'sip',
        [logging.handlers.DEFAULT_TCP_LOGGING_PORT],
        ['python3', 'sip/common/logging_server.py'])

    from sip.common.logging_api import log
    from sip.master.master_states import MasterControllerSM
    from sip.master.master_states import Standby
    from sip.master.slave_poller import SlavePoller
    from sip.master.rpc_service import RpcService
    from sip.master.reconnect import reconnect

# Wait until it initializes
time.sleep(1.0)

Resource table:
[000] localhost
  - sip_root /mnt/scratch/software/SKA/integration-prototype/sip/
  - launch_protocol ['docker', 'spark']


In [7]:
# Find logserver container name and attach to it in a separate terminal window
import docker
client = docker.from_env()
logcontainer = client.containers.list(filters={"name":config.logserver.name})
cmd = "'docker attach " + logcontainer[0].name + "'"
os.system("gnome-terminal -e " + cmd)

0

In [8]:
# Create the slave config array from the configuration (a JSON string)
with open(config_file) as f:
    config._slave_config_dict = json.load(f)

In [9]:
# Create the master controller state machine
config.master_controller_state_machine = MasterControllerSM()

In [10]:
# Create and start the slave poller
SlavePoller(config.master_controller_state_machine).start()

In [11]:
# This starts the rpyc 'ThreadedServer' - this creates a new
# thread for each connection on the given port
server = ThreadedServer(RpcService, port=12345)
t = threading.Thread(target=server.start)
t.setDaemon(True)
t.start()

In [12]:
# Attempt to connect to exiting services
reconnect(paas)

In [None]:
# For testing we can also post events typed on the terminal
sm = config.master_controller_state_machine
while True:
    if os.path.exists("docker_swarm"):
        time.sleep(1)
    else:
        # Read from the terminal and process the event
        event = input('** Enter command:\n').split()
        if event:
            if event[0] == 'state':
                log.info('CLI: Current state: {}'.
                         format(sm.current_state()))
                continue
            log.info('CLI: !!! Posting event ==> {}'.format(event[0]))
            result = sm.post_event(event)
            if result == 'rejected':
                log.warn('CLI: not allowed in current state')
            elif result == 'ignored':
                log.warn('CLI: command ignored: {}'.format(event[0]))
            else:
                # Print what state we are now in.
                log.info('CLI: master controller state: {}'.format(
                    sm.current_state()))
        else:
            print('** Allowed commands: online, offline, shutdown, '
                   'cap [name] [task]')

** Enter command:
online
