Skip to content

Commit

Permalink
several changes
Browse files Browse the repository at this point in the history
 - Refactor some of the methods to make them work better.
 - add a method to get the nodes
 - fix tests for the new methods
 - Remember to release the semaphore so other threads can continue
 - Set more log to info levels
 - Add a small script to generate 9k queues for testing purpouses
 - Some more error catch
  • Loading branch information
Itxaka committed Feb 5, 2018
1 parent 80e8dda commit 5a18f17
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 15 deletions.
49 changes: 35 additions & 14 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(self):
"ha-mode": "nodes",
"ha-params": []
},
"priority": 990,
"priority": 600,
"apply-to": "queues"
}

Expand Down Expand Up @@ -144,22 +144,24 @@ def get_queues(self):
response = self.conn.get(self.queues_url).json()
return response

def get_nodes(self):
response = self.conn.get(self.nodes_url).json()
return response

def ordered_queue_list(self):
# type: (None) -> dict
"""
:return: a dictionary of hosts and their queues
"""
queues_ordered_by_host = {}
# FIXME: this assumes that all nodes have at least 1 queue. if they dont they wont appear here.
# so maybe a different approach to getting the cluster nodes is preferred (/api/nodes ?)
queues = self.get_queues()
nodes = self.get_nodes()
queues_ordered_by_host = {node["name"]: [] for node in nodes}
self.log.info("There is a total of {} queues".format(len(queues)))
for queue in queues:
if queue["node"] in queues_ordered_by_host:
queues_ordered_by_host[queue["node"]].append(queue["name"])
else:
queues_ordered_by_host[queue["node"]] = [queue["name"]]
queues_ordered_by_host[queue["node"]].append(queue["name"])
return queues_ordered_by_host

def calculate_queue_distribution(self, queue_list):
Expand Down Expand Up @@ -200,31 +202,45 @@ def apply_policy(self, queue_name, target):
data = copy.deepcopy(self.policy_new_master)
data["definition"]["ha-params"] = [target]
data["pattern"] = "^{}$".format(queue_name)
self.log.debug("Applying policy to {}: {}".format(queue_name, data))
self.log.info("Applying policy to {}: {}".format(queue_name, data))
# move queue into its new master
self.conn.put(self.policy_url.format(policy_name), json=data)

def wait_until_queue_moved_to_new_master(self, queue_name, target):
# type: (str, str) -> None
self.sync_queue(queue_name)
while self.check_status(queue_name)["node"] != target:
self.log.debug("Queue {} still not moved to {}".format(queue_name, target))
sleep(self.wait_time)
moved = False
while not moved:
status = self.check_status(queue_name)
self.log.debug("queue {} status is {}".format(queue_name, status))
if "error" in status:
self.log.error("Found an error while checking queue {}: {}".format(queue_name, status["error"]))
# end checking
moved = True
else:
if status["node"] != target:
self.log.info("Queue {} still not moved to {}".format(queue_name, target))
sleep(self.wait_time)
else:
moved = True

def delete_policy(self, queue_name):
# type: (str) -> None
policy_name = self.policy_name(queue_name)
self.log.debug("Deleting policy {}".format(policy_name))
self.log.info("Deleting policy {}".format(policy_name))
response = self.conn.delete(self.policy_url.format(policy_name))
self.log.debug("Response of delete_policy: {}".format(response.status_code))

def check_status(self, queue_name):
# type: (str) -> dict
response = self.conn.get(self.queue_status_url.format(queue_name)).json()
try:
response = self.conn.get(self.queue_status_url.format(queue_name)).json()
except requests.ConnectionError:
response = {"error": "Connection error"}
return response

def sync_queue(self, queue_name):
# type: (str) -> None
self.log.info("Syncing queue {}".format(queue_name))
response = self.conn.post(self.sync_url.format(queue_name), json={"action": "sync"})
self.log.debug("Response from sync_queue: {}".format(response.status_code))

Expand All @@ -245,12 +261,14 @@ def move_queue(self):
queue = self.queue_pool.pop()
except IndexError:
self.log.info("No more queues in the overloaded pool")
self.semaphore.release()
return
try:
# pop node from the destiny pool
target = self.destiny_pool.pop()
except IndexError:
self.log.info("No more nodes in the destination pool")
self.semaphore.release()
return
self.log.info("Started moving queue {} to {}".format(queue, target))
self.apply_policy(queue, target)
Expand All @@ -262,7 +280,8 @@ def move_queue(self):
self.semaphore.release()

def go(self):
self.log.debug("Starting queue balancing")
start = time.time()
self.log.info("Starting queue balancing")
self.prepare()
self.log.debug("Queue pool has {} items".format(len(self.queue_pool)))
self.log.debug("Destination pool has {} items".format(len(self.destiny_pool)))
Expand All @@ -281,6 +300,8 @@ def go(self):
self.log.info("Waiting for thread {} to finish".format(t.getName()))
t.join()

self.log.info("Finished moving queues. It took {} seconds".format(time.time() - start))


if __name__ == '__main__':
q = QueueBalancer()
Expand Down
10 changes: 9 additions & 1 deletion tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ def setUpClass(cls):
{"node": "3", "name": "q5"}
]

cls.nodes_list = [
{"name": "1"},
{"name": "2"},
{"name": "3"}
]

def setUp(self):
config = u"""[default]
username = username
Expand Down Expand Up @@ -81,8 +87,10 @@ def test_distribution_returns_integers_even_if_unbalanced(self):
self.assertIs(type(distributed[k]), int)

@patch("main.QueueBalancer.get_queues")
def test_ordered_queue_list(self, get_queues_mock):
@patch("main.QueueBalancer.get_nodes")
def test_ordered_queue_list(self, get_nodes_mock, get_queues_mock):
get_queues_mock.return_value = self.queues_list
get_nodes_mock.return_value = self.nodes_list
# from http api list to nice dictionary
self.assertEqual(self.balancer.ordered_queue_list(), {"1": ["q1", "q2", "q3"], "3": ["q5"], "2": ["q4"]})

Expand Down
14 changes: 14 additions & 0 deletions tools/generate_queues.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pika

parameters = pika.ConnectionParameters(
host="localhost",
virtual_host="/",
credentials=pika.PlainCredentials("guest", "guest")
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

for i in range(9000):
channel.queue_declare(queue="channel{}".format(i), durable=True)

connection.close()

0 comments on commit 5a18f17

Please sign in to comment.