Skip to content

Commit

Permalink
Add delete action
Browse files Browse the repository at this point in the history
Instead of moving the queues which is prone to errors and to
leave broken queues, we can simply delete the queues wich are
empty and they will be recreated in less than a second and
the proper balincing done to them.

This allows us to be blazing fast with minimal impact and without
having to doo too much.

Wont remove the code from the move part yet, that will come in a
different PR
  • Loading branch information
Itxaka committed Feb 5, 2018
1 parent 347be5f commit fd7a971
Showing 1 changed file with 39 additions and 7 deletions.
46 changes: 39 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
class ConfigNotFound(Exception):
pass

class NoMoreQueues(Exception):
pass


class QueueBalancer:
class SignalHandler:
Expand All @@ -48,6 +51,7 @@ def __call__(self, signum, frame):
exit(0)

def __init__(self):
self.action = "delete"
self.retries = 6 # this number * self.wait_time == total number to wait for the check to expire
signal.signal(signal.SIGINT, self.SignalHandler())
self.log = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,6 +94,7 @@ def __init__(self):
self.nodes_url = "{}/api/nodes".format(full_host)
self.queues_url = "{}/api/queues/{}".format(full_host, vhost)
self.queue_status_url = self.queues_url + "/{}"
self.queue_delete_url = self.queues_url + "/{}"
self.policy_url = "{}/api/policies/{}".format(full_host, vhost) + "/{}"
self.sync_url = "{}/api/queues/{}".format(full_host, vhost) + "/{}/actions"

Expand Down Expand Up @@ -259,7 +264,26 @@ def prepare(self):
# fill the destination pool
self.fill_queue_with_destination_nodes(distribution)

def move_queue(self):
def delete_queue(self, queue_name):
response = self.conn.delete(self.queue_delete_url.format(queue_name), params={"if-empty": "true"}, timeout=5)
self.log.debug("Response to delete queue {}: {}".format(queue_name, response.status_code))

def delete_queue_action(self):
start = time.time()
try:
# pop queue from the overloaded pool
queue = self.queue_pool.pop()
except IndexError:
self.log.info("No more queues in the overloaded pool")
self.semaphore.release()
raise NoMoreQueues()

self.delete_queue(queue)
sleep(self.wait_time)
self.log.info("Finished deleting queue {}. It took {} seconds".format(queue, time.time() - start))
self.semaphore.release()

def move_queue_action(self):
start = time.time()
try:
# pop queue from the overloaded pool
Expand Down Expand Up @@ -290,12 +314,20 @@ def go(self):
self.prepare()
self.log.debug("Queue pool has {} items".format(len(self.queue_pool)))
self.log.debug("Destination pool has {} items".format(len(self.destiny_pool)))
self.log.info("Running action {}".format(self.action))
# start threading here
while len(self.queue_pool) > 0 or len(self.destiny_pool) > 0:
self.semaphore.acquire()
t = threading.Thread(target=self.move_queue)
self.log.debug("Starting new thread: {}".format(t.getName()))
t.start()
if self.action == "delete":
while len(self.queue_pool) > 0:
self.semaphore.acquire()
t = threading.Thread(target=self.delete_queue_action)
self.log.debug("Starting new thread: {}".format(t.getName()))
t.start()
elif self.action == "move":
while len(self.queue_pool) > 0 or len(self.destiny_pool) > 0:
self.semaphore.acquire()
t = threading.Thread(target=self.move_queue_action)
self.log.debug("Starting new thread: {}".format(t.getName()))
t.start()

# some housekeeping, if we dont have anymore queues to move that's ok but there may still be
# threads doing some work, so find and join them so we wait for them to finish
Expand All @@ -305,7 +337,7 @@ def go(self):
self.log.info("Waiting for thread {} to finish".format(t.getName()))
t.join()

self.log.info("Finished moving queues. It took {} seconds".format(time.time() - start))
self.log.info("Finished action {} on queues. It took {} seconds".format(self.action, time.time() - start))


if __name__ == '__main__':
Expand Down

0 comments on commit fd7a971

Please sign in to comment.