In [11]:
# Database
import pandas as pd
import sys
import os
import time
import numpy as np
import datetime

# Logging
from v_log import VLogger
import logging
import tqdm

#S3 interaction
from io import StringIO 
import boto3
import json  # to process the outputs of AWS CLI
import subprocess
import re

In [12]:
# Create SQS client
sqs = boto3.client('sqs')
queue_url_batch = "https://sqs.eu-west-1.amazonaws.com/555381533193/get_batch.fifo"
#queue_url_status = "https://sqs.eu-west-1.amazonaws.com/555381533193/status.fifo"
queue_url_stat = "https://sqs.eu-west-1.amazonaws.com/555381533193/stat"

# Formats


## get_batch

- Start running the send_initial_batchs function in the loop in Section 1 to generate all the pairs batch:iter_ini. This will tell the instance to pick that batch and start from iter_ini. For this initial procedure, we will start obviously from iter = 0.

- Imagine that an EC2 instance has stopped at iteration 100 at batch 4, then we want to remove the 4:100 message and send a new message with 4:100. This is the Section 2. Send a new batch iteration and remove previous message

## status

- To monitor the status of each instance and see if it is currently handling a batch or not, instead of going inside each instance by SSH we will do a queue that will keep the latest state of the instance. The format will be (imagine the instance name is NODE1 and it has accepted a message in get_batch named 4:100, batch 4 iteration to start 100). Then the format will be: NODE1:ON:4:10

# Section 1. Create queue of batch messages

In [14]:
def send_initial_batchs(batch_num, iter_ini):
    """Send message to SQS queue: get_batch.fifo"""
    response = sqs.send_message(
        QueueUrl=queue_url_batch,
        DelaySeconds=0,
        MessageAttributes={},
        MessageBody=(f"{batch_num}:{iter_ini}"),
        MessageDeduplicationId=f'{batch_num},{iter_ini}',
        MessageGroupId = "batch_initial"
    )

In [15]:
# iter_ini = 0
# for batch_num in tqdm.tqdm(range(0,36)):
#     if batch_num == 0:
#         send_initial_batchs(batch_num, 9558)
#     elif batch_num == 1:
#         send_initial_batchs(batch_num, 3774)
#     else:
#         send_initial_batchs(batch_num, iter_ini)

100%|██████████████████████████████████████████████████████████████████████████████████| 36/36 [00:02<00:00, 15.57it/s]


## 1.2 Receive the message and remove it from the queue

In [13]:
def sqs_get_batch_iter_from_message(response):
    """Parses the response as json to output the batch and iter numbers from message"""
    sqs_batch, sqs_iter = response["Messages"][0]["Body"].split(":")
    sqs_batch = int(sqs_batch)
    sqs_iter = int(sqs_iter)
    return sqs_batch, sqs_iter

def receive_message_sqs_batch_iter():
    """Receive message from SQS queue: get_batch.fifo
    IMPORTANT! VisibilitTImeout: https://github.com/aws/aws-sdk-js/issues/1279
    """
    response = sqs.receive_message(
        QueueUrl=queue_url_batch,
        MaxNumberOfMessages=1,
        MessageAttributeNames=[
            'All'
        ],
        VisibilityTimeout=10, # important!!! 
        WaitTimeSeconds=0
    )
    return response

def get_id_message(response):
    """From the response of SQS, it parses the identifier (receiptHandler) of the message"""
    return response["Messages"][0]['ReceiptHandle']

def delete_message(id_message):
    """Deletes the message, it should be under the visibility timeout interval this command,
    otherwise the message will be re-send to the queue"""
    sqs.delete_message(
    QueueUrl=queue_url_batch,
    ReceiptHandle=id_message
    )
    return

In [10]:
# # Receive a message
# resp = receive_message_sqs_batch_iter()

# # If no message is present in the queue, finish the code
# if "Messages" not in resp:
#     #sys.exit("No more messages to process")
#     print("No more messages to process")
# else:
#     # Parse it to obtain batch and number of iteration
#     # Add to the log file which batch num and num iter has received
#     batch_num, num_iter = sqs_get_batch_iter_from_message(resp)

#     #Get the identifier of that message to delete it
#     id_message = get_id_message(resp)

#     # Delete it
#     delete_message(id_message)

In [14]:
def read_message_and_delete():
    # Receive a message
    resp = receive_message_sqs_batch_iter()

    # If no message is present in the queue, finish the code
    if "Messages" not in resp:
        #sys.exit("No more messages to process")
        print("No more messages to process")
    else:
        # Parse it to obtain batch and number of iteration
        # Add to the log file which batch num and num iter has received
        batch_num, num_iter = sqs_get_batch_iter_from_message(resp)

        #Get the identifier of that message to delete it
        id_message = get_id_message(resp)

        # Delete it
        delete_message(id_message)
    return batch_num, num_iter

In [35]:
# batch_num, num_iter = read_message_and_delete()
# batch_num

4

4

# Section 2. Send a new batch -  iteration 

In [36]:
def send_batch_iter(batch_num, counter_iteration):
    """Send message to SQS queue: get_batch.fifo
    Specifying at which batch_num and iteration it has arrived
    """
    response = sqs.send_message(
        QueueUrl=queue_url_batch,
        DelaySeconds=0,
        MessageAttributes={},
        MessageBody=(f"{batch_num}:{counter_iteration}"),
        MessageDeduplicationId=f'{batch_num},{counter_iteration}',
        MessageGroupId = "batch"
    )

In [37]:
# imagine I was blocked at batch_num = 0 at counter_iteration = 1000
# batch_num = 0
# counter_iteration = 1000

# send_batch_iter(batch_num, counter_iteration)

# Section 3: Status of the Instance

## 3.1 Send the status

In [None]:
def get_now():
    now = datetime.datetime.now()
    dt_string = now.strftime("%Y-%m-%d %H:%M:%S")
    return dt_string

In [41]:
def get_current_instance_id():
    try:
        desc_inst = subprocess.check_output('ec2-metadata -i', shell = True)
        return desc_inst.strip().split(": ")[1]
    except:
        return False

In [42]:
#instance_id =  get_current_instance_id() # if you are on a EC2 instance
# desc_inst = 'instance-id: i-0045263fc6ad11a3c\n'
# instance_id = desc_inst.strip().split(": ")[1]

In [43]:
def get_current_instance_name():
    instance_id =  get_current_instance_id()
    if instance_id is False:
        return False
    comando = f'aws ec2 describe-tags --filters "Name=resource-id,Values={instance_id}"'
    try:
        get_instance_name = subprocess.check_output(comando, shell = True)
        get_instance_name_json = json.loads(get_instance_name)
        instance_name = get_instance_name_json["Tags"][0]["Value"]
        return instance_name
    except:
        return False

In [44]:
instance_name = get_current_instance_name()

In [45]:
def send_status(instance_name, status, batch_num, counter_iteration):
    """Send message to SQS queue: status.fifo
    instance_name: tagged name of the instance 
    status: if the instance has just taken that message, put status = "ON"
    batch_num: batch num that is actually processing (ON) / has processed (OFF) that instance
    counter_iteration: ON: iteration starting at / OFF: iteration that has ended
    """
    date_today = get_now()
    message_body = f'{instance_name}:{status}:{batch_num}:{counter_iteration}:{date_today}'
    response = sqs.send_message(
        QueueUrl=queue_url_stat,
        DelaySeconds=0,
        MessageAttributes={},
        MessageBody=(message_body)
    )

In [46]:
instance_name = "nodo0";
status = "OFF"
batch_num = 23
counter_iteration = 169

In [47]:
# resp_status = send_status(instance_name, status, batch_num, counter_iteration)

## 3.2 Monitor status

In [3]:
def receive_status():
    """Receive message from SQS queue: status.fifo
    Checks ALL the messages
    """
    
    response = sqs.receive_message(
        QueueUrl=queue_url_stat,
        MaxNumberOfMessages=1,
        MessageAttributeNames=[
            'All'
        ],
        VisibilityTimeout=10,
        WaitTimeSeconds=0
    )
    return response

In [21]:
def receive_all_status():
    set_messages = set()
    counter = 0
    while counter < 1000:
        counter += 1
        all_status = receive_status()
        try:
            set_messages.add(all_status["Messages"][0]["Body"])
        except KeyError:
            print("Finished!")
            break
    return set_messages

In [31]:
set_messages = receive_all_status()

Finished!


In [32]:
df_list = list()
for mm in list(set_messages):
    df_list.append(mm.split(":"))

In [33]:
df = pd.DataFrame(df_list)
df.columns = ["id","status","batch","iter","date","minuts","seconds"]

In [34]:
df.sort_values(["id","batch","status"])

Unnamed: 0,id,status,batch,iter,date,minuts,seconds
8,False,OFF,12,7655,2020-04-21 15,37,36
32,False,ON,12,0,2020-04-19 14,7,27
13,False,OFF,13,7571,2020-04-21 16,26,0
28,False,ON,13,0,2020-04-19 15,37,30
4,False,OFF,14,7469,2020-04-21 15,47,31
39,False,ON,14,0,2020-04-19 15,50,21
41,False,OFF,15,7352,2020-04-21 15,41,19
29,False,ON,15,0,2020-04-19 15,52,33
16,False,OFF,16,7412,2020-04-21 15,39,25
9,False,ON,16,0,2020-04-19 15,54,52


In [18]:
def receive_message_sqs_batch_iter2():
    """Receive message from SQS queue: get_batch.fifo
    IMPORTANT! VisibilitTImeout: https://github.com/aws/aws-sdk-js/issues/1279
    """
    response = sqs.receive_message(
        QueueUrl=queue_url_batch,
        MaxNumberOfMessages=1,
        MessageAttributeNames=[
            'All'
        ],
        VisibilityTimeout=0, # important!!! 
        WaitTimeSeconds=0
    )
    return response

In [19]:
saved_queue = list()

for ii in range(0,36):
    resp = receive_message_sqs_batch_iter2()
    batch_num, num_iter = sqs_get_batch_iter_from_message(resp)
    saved_queue.append((batch_num, num_iter))

In [20]:
saved_queue

[(22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781),
 (22, 0),
 (0, 9781)]

In [17]:
resp

{'ResponseMetadata': {'RequestId': '33e958ce-6b60-51c7-ac88-0fd7442b86b7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '33e958ce-6b60-51c7-ac88-0fd7442b86b7',
   'date': 'Tue, 21 Apr 2020 17:27:39 GMT',
   'content-type': 'text/xml',
   'content-length': '240'},
  'RetryAttempts': 0}}