diff --git a/main.py b/main.py index f8620a2..5c02406 100644 --- a/main.py +++ b/main.py @@ -98,7 +98,7 @@ def __init__(self): "ha-mode": "nodes", "ha-params": [] }, - "priority": 990, + "priority": 600, "apply-to": "queues" } @@ -144,22 +144,24 @@ def get_queues(self): response = self.conn.get(self.queues_url).json() return response + def get_nodes(self): + response = self.conn.get(self.nodes_url).json() + return response + def ordered_queue_list(self): # type: (None) -> dict """ :return: a dictionary of hosts and their queues """ - queues_ordered_by_host = {} # FIXME: this assumes that all nodes have at least 1 queue. if they dont they wont appear here. # so maybe a different approach to getting the cluster nodes is preferred (/api/nodes ?) queues = self.get_queues() + nodes = self.get_nodes() + queues_ordered_by_host = {node["name"]: [] for node in nodes} self.log.info("There is a total of {} queues".format(len(queues))) for queue in queues: - if queue["node"] in queues_ordered_by_host: - queues_ordered_by_host[queue["node"]].append(queue["name"]) - else: - queues_ordered_by_host[queue["node"]] = [queue["name"]] + queues_ordered_by_host[queue["node"]].append(queue["name"]) return queues_ordered_by_host def calculate_queue_distribution(self, queue_list): @@ -200,31 +202,45 @@ def apply_policy(self, queue_name, target): data = copy.deepcopy(self.policy_new_master) data["definition"]["ha-params"] = [target] data["pattern"] = "^{}$".format(queue_name) - self.log.debug("Applying policy to {}: {}".format(queue_name, data)) + self.log.info("Applying policy to {}: {}".format(queue_name, data)) # move queue into its new master self.conn.put(self.policy_url.format(policy_name), json=data) def wait_until_queue_moved_to_new_master(self, queue_name, target): # type: (str, str) -> None - self.sync_queue(queue_name) - while self.check_status(queue_name)["node"] != target: - self.log.debug("Queue {} still not moved to {}".format(queue_name, target)) - sleep(self.wait_time) + moved = False + while not moved: + status = self.check_status(queue_name) + self.log.debug("queue {} status is {}".format(queue_name, status)) + if "error" in status: + self.log.error("Found an error while checking queue {}: {}".format(queue_name, status["error"])) + # end checking + moved = True + else: + if status["node"] != target: + self.log.info("Queue {} still not moved to {}".format(queue_name, target)) + sleep(self.wait_time) + else: + moved = True def delete_policy(self, queue_name): # type: (str) -> None policy_name = self.policy_name(queue_name) - self.log.debug("Deleting policy {}".format(policy_name)) + self.log.info("Deleting policy {}".format(policy_name)) response = self.conn.delete(self.policy_url.format(policy_name)) self.log.debug("Response of delete_policy: {}".format(response.status_code)) def check_status(self, queue_name): # type: (str) -> dict - response = self.conn.get(self.queue_status_url.format(queue_name)).json() + try: + response = self.conn.get(self.queue_status_url.format(queue_name)).json() + except requests.ConnectionError: + response = {"error": "Connection error"} return response def sync_queue(self, queue_name): # type: (str) -> None + self.log.info("Syncing queue {}".format(queue_name)) response = self.conn.post(self.sync_url.format(queue_name), json={"action": "sync"}) self.log.debug("Response from sync_queue: {}".format(response.status_code)) @@ -245,12 +261,14 @@ def move_queue(self): queue = self.queue_pool.pop() except IndexError: self.log.info("No more queues in the overloaded pool") + self.semaphore.release() return try: # pop node from the destiny pool target = self.destiny_pool.pop() except IndexError: self.log.info("No more nodes in the destination pool") + self.semaphore.release() return self.log.info("Started moving queue {} to {}".format(queue, target)) self.apply_policy(queue, target) @@ -262,7 +280,8 @@ def move_queue(self): self.semaphore.release() def go(self): - self.log.debug("Starting queue balancing") + start = time.time() + self.log.info("Starting queue balancing") self.prepare() self.log.debug("Queue pool has {} items".format(len(self.queue_pool))) self.log.debug("Destination pool has {} items".format(len(self.destiny_pool))) @@ -281,6 +300,8 @@ def go(self): self.log.info("Waiting for thread {} to finish".format(t.getName())) t.join() + self.log.info("Finished moving queues. It took {} seconds".format(time.time() - start)) + if __name__ == '__main__': q = QueueBalancer() diff --git a/tests.py b/tests.py index 2db46fd..439a67e 100644 --- a/tests.py +++ b/tests.py @@ -41,6 +41,12 @@ def setUpClass(cls): {"node": "3", "name": "q5"} ] + cls.nodes_list = [ + {"name": "1"}, + {"name": "2"}, + {"name": "3"} + ] + def setUp(self): config = u"""[default] username = username @@ -81,8 +87,10 @@ def test_distribution_returns_integers_even_if_unbalanced(self): self.assertIs(type(distributed[k]), int) @patch("main.QueueBalancer.get_queues") - def test_ordered_queue_list(self, get_queues_mock): + @patch("main.QueueBalancer.get_nodes") + def test_ordered_queue_list(self, get_nodes_mock, get_queues_mock): get_queues_mock.return_value = self.queues_list + get_nodes_mock.return_value = self.nodes_list # from http api list to nice dictionary self.assertEqual(self.balancer.ordered_queue_list(), {"1": ["q1", "q2", "q3"], "3": ["q5"], "2": ["q4"]}) diff --git a/tools/generate_queues.py b/tools/generate_queues.py new file mode 100644 index 0000000..357b627 --- /dev/null +++ b/tools/generate_queues.py @@ -0,0 +1,14 @@ +import pika + +parameters = pika.ConnectionParameters( + host="localhost", + virtual_host="/", + credentials=pika.PlainCredentials("guest", "guest") +) +connection = pika.BlockingConnection(parameters) +channel = connection.channel() + +for i in range(9000): + channel.queue_declare(queue="channel{}".format(i), durable=True) + +connection.close() \ No newline at end of file