In [None]:
import pika, random, time, os, json, yaml, math
import pywren_ibm_cloud as pywren
from GanttDisplay import GanttDisplay
from MassiveDisplay import MassiveDisplay
from IPython.display import Image

def worker(args):
    pw_config = json.loads(os.environ.get('PYWREN_CONFIG', ''))
    pika_params = pika.URLParameters(pw_config['rabbitmq']['amqp_url'])
    connection = pika.BlockingConnection(pika_params)
    channel = connection.channel()

    initialTime = 1000 + random.randint(0, 500)
    time_to_work = initialTime
    stage = 0
    stage_complete = False

    while time_to_work > 0:
        worksession = random.randint(0, math.floor(time_to_work/5)) if time_to_work >= 5 else time_to_work
        time_to_work = time_to_work - worksession
        time.sleep(worksession/110)
        stage_complete = False
        if time_to_work < (initialTime * 0.125):
            stage_complete = stage < 2
            stage = 2
        elif time_to_work < (initialTime * 0.9):
            stage_complete = stage < 1
            stage = 1
        channel.publish(exchange='',
                        routing_key=args['qid'], 
                        body='{:02}:{}:{}:{}'.format(
                            args['worker_id'],
                            stage if not stage_complete else stage-1,
                            int(stage_complete),
                            worksession/100
                            )
                        )
    channel.publish(exchange='', routing_key=args['qid'], body='{:02}:{}:{}:{}'.format(args['worker_id'], stage, '1', 0))
    return initialTime/100
    
class MonitorCallback():
    def __init__ (self, progbars):
        self.progbars = progbars
        self.progbars.show()

    def __call__(self, ch, method, properties, body):
        msg = body.decode('utf-8')

        workerid = int(msg[:2])
        stage = int(msg[3:4])
        stage_complete = int(msg[5:6])
        jobdone = float(msg[7:])

        #print('id:',workerid, '  stage:', stage, '  job:',jobdone)
        self.progbars.update(workerid, stage, jobdone, stage_complete)
        
        if self.progbars.isDone():
            ch.stop_consuming()

In [None]:
def monitor(n):
    with open(os.path.expanduser('~/.pywren_config'), 'r') as f:
        secret = yaml.safe_load(f)
    pika_params = pika.URLParameters(secret['rabbitmq']['amqp_url'])
    connection = pika.BlockingConnection(pika_params)
    channel = connection.channel()

    iterdata = [[{'worker_id':i, 'qid':'master-queue'}] for i in range(0, n)]
    progbars = GanttDisplay([i[0]['worker_id'] for i in iterdata])
    channel.queue_declare(queue='master-queue')

    try:
        pw = pywren.ibm_cf_executor(rabbitmq_monitor=True)
        futures = pw.map(worker, iterdata)
        channel.basic_consume(consumer_callback=MonitorCallback(progbars), queue='master-queue')
        channel.start_consuming()
        #results = pw.get_result()
        pw.create_timeline_plots(futures=futures, dst_dir='/home/lab144/Desktop', dst_file_name='plot1')
    finally:
        channel.queue_delete(queue='master-queue')
        channel.close()
        print('Deleted the queue.')

In [None]:
monitor(15)