-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discuss: BEP-14 #40
Comments
Hey guys, so i drafted a class using some of your code. The idea of the implementation in mind is based on Alberto suggested implementation and optimization, basically I'm prioritizing nodes by low latency using heapq, keeping it sorted by elapsed time of the current request, and initializing the heap (load_latency) using the elapsed time of a HEAD request to every node in the list. from .exceptions import HTTP_EXCEPTIONS, TransportError
from collections import namedtuple
from heapq import heappush, heappop
from requests import Session
HttpResponse = namedtuple('HttpResponse', ('status_code', 'headers', 'data'))
DELAY = 1000 * 10
class LoadBalancer:
"""Connection load balancer """
def __init__(self, *, nodes, headers=None):
self.heap = []
self.nodes = []
self.tries = 0
self.session = Session()
self.init_nodes(nodes)
self.load_latency()
def init_nodes(nodes):
if headers is None:
for url in nodes:
self.nodes.append({'endpoint': url, 'headers': None})
elif isinstance(nodes[0], basestring):
self.nodes = []
for url in nodes:
self.nodes.append({'endpoint': url, 'headers': headers})
elif isinstance(nodes[0], dict):
self.nodes = nodes
def load_latency(self):
# init all nodes with current latency using a HEAD request
for data in self.nodes:
url = data.endpoint
header = data.headers
response = self.session.request(
url=url,
method='HEAD',
headers=header,
)
td = response.elapsed
millis = td.microseconds/1000.0
heappush(self.heap,(millis, (data, millis))
def load_node(self, method, path=None, json=None,
params=None, headers=None, **kwargs):
while not self.tries :
best_node = heappop(self.heap)
time = best_node[1]
url = best_node[0].endpoint
response = self.session.request(
method=method,
url=url,
json=json,
params=params,
headers=headers,
**kwargs
)
text = response.text
if not (200 <= response.status_code < 300):
# try again, dont try this node on a DELAY while
self.tries += 1
time = time + DELAY
heappush(self.heap, (time, (best_node[0], time ) )
elif self.tries > 2:
# we are done trying raise error
self.tries = 0
exc_cls = HTTP_EXCEPTIONS.get(
response.status_code,
TransportError)
raise exc_cls(response.status_code, text, json)
else:
# update the node with new elapsed time
# and return the actual response
self.tries = 0
time = response.elapsed.microseconds / 1000.0
heappush(self.heap, (time, (best_node[0], time )))
try:
json = response.json()
except ValueError:
json = None
data = json if json is not None else text
return HttpResponse(response.status_code,
response.headers, data) please let me know your opinions. |
Hi Camilo, thanks for the preview of the Python code, I really like the idea to use If you want to test your implementation on a live network, you can create an account in our BigchainDB Testnet. Once you sign up for the test network, you'll get a token you have to the headers of your request. The test network runs four nodes, all running on Azure, West Europe, so they might have similar latency:
I have a couple of questions related to this implementation:
|
Hi, thanks!. So answering your questions we have: - At what time do you expect to run the load_latency method? - Latency changes over time, what if a node that was slow in responding to load_latency was just temporary under heavy load? def heap_algo(self ):
best_node = heappop(self.heap)
response = request(best_node)
if response is not ok:
# try again, dont try this node on a DELAY while
time = time + DELAY
heappush(self.heap, (time, (best_node, time ) )
else:
# update the node with new elapsed time
time = response.elapsed.microseconds / 1000.0
heappush(self.heap, (time, (best_node, time ))) the headpop method retrieve the node from the heap and after the request we put it back (heappush) with updated latency, so that keeps latency updated for every node during the algorithm execution. |
Also following are the updated method and some new ones of the initial code, which should fix the incorrect logic you mentioned during the call. With this code we keep track of the amount of failed tries per node, so after we tried every node 3 times we raise an error "All nodes are unreacheable" def __init__(self, *, nodes, headers=None):
#....
self.tries_per_node = {}
self.init_nodes(nodes)
self.init_tries_per_node()
def init_tries_per_node(self):
for node in self.nodes:
tries_per_node[node.endpoint] = 0
def update_tries(self, node):
tries_per_node[node.endpoint] += 1
def enough_tries(self):
for node in tries_per_node:
if tries_per_node[node] < 3:
return False
return True
def request(self, method, path=None, json=None,
params=None, headers=None, **kwargs):
while is not self.enough_tries():
node_response = self.load_node(method, path=None, json=None,
params=None, headers=None, **kwargs)
if node_response is not None:
return node_response
# raise an error. "All nodes are unreacheable"
def load_node(self, method, path=None, json=None,
params=None, headers=None, **kwargs):
best_node = heappop(self.heap)
time = best_node[1]
url = best_node[0].endpoint
response = self.session.request(
method=method,
url=url,
json=json,
params=params,
headers=headers,
**kwargs
)
text = response.text
if not (200 <= response.status_code < 300):
# try again, dont try this node on a DELAY while
time = time + DELAY
heappush(self.heap, (time, (best_node[0], time ) )
self.update_tries(best_node)
return None
else:
# update the node with new elapsed time
# and return the actual response
time = response.elapsed.microseconds / 1000.0
heappush(self.heap, (time, (best_node[0], time )))
try:
json = response.json()
except ValueError:
json = None
data = json if json is not None else text
return HttpResponse(response.status_code,
response.headers, data) Of course not final code, but just to correct the draft and move forward. let me know any further questions :) |
I have some further questions about this implementation.
This means that a user of the driver might experience a delay proportional to the number of nodes passed at initialization, because of the blocking consecutive requests to estimate the round-trip time. In the worst case where one or more nodes are unavailable, the delay the user might experience is as high as the socket timeout times the number of unavailable nodes. There is another detail that might not be obvious (it was not obvious to me, at least): measuring the time for a My suggestion to fix this would be to not use
We have 4 nodes: A, B, C, D. After some time, we have something like: My suggestion to fix it would be to add some logic to There is also another thing I didn't consider before: different endpoints have different response times, depending on the amount of computation the endpoint requires. Some endpoints are really fast (e.g. get a transaction) some others can be slower (e.g. pushing a transaction and waiting until it's stored in a block), so we might end up putting in the heap response times for really different jobs. Given that, I'm wondering if we should proceed with the original, simpler proposal. Thoughts? |
Sorry on the late response, i agree with almost everything, just a couple of different thoughts:
Yeah true, but thats assuming that hitting the same node every time, is never going to change the latency, which could contradict your next argument
Having that in mind, suggests that we are going to use the all the endpoints in all the nodes. and the chances of hitting another node (not our best node) increase. but i agree with it, i mean maybe this suggest we should calculate latency per endpoint and modify the queue priorities logic.
Another idea to your suggestion could be, we don't hit the same endpoint more than say 2-3 times consecutively, if we reach to that we use the decreasing time constant you suggest. On the other hand
Yeah ill implement the round robin approach first, after that ill compare with this method. thank you! |
Hey there guys, just added a PR for BEP/14!, please let me know following steps, suggestions. |
The purpose of this issue is to serve as a place to openly discuss the specifics of BEP-14.
The text was updated successfully, but these errors were encountered: