Skip to content

Commit

Permalink
registration fix for containers restarting
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed Nov 13, 2020
1 parent 2d959f3 commit 4d31769
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 61 deletions.
3 changes: 3 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ if [ $run_pyflakes ]; then
fi
fi

echo "running setup.py"
python setup.py install

echo "clean stopped containers"
docker rm -v $(docker ps -aq -f status=exited)

Expand Down
4 changes: 2 additions & 2 deletions hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ async def register(app):
body = {"id": app["id"], "port": outside_port, "node_type": app["node_type"]}
app['register_time'] = int(time.time())
try:
log.debug(f"register req: {req_reg} body: {body}")
log.info(f"register req: {req_reg} body: {body}")
rsp_json = await http_post(app, req_reg, data=body)
if rsp_json is not None:
log.debug(f"register response: {rsp_json}")
log.info(f"register response: {rsp_json}")
app["node_number"] = rsp_json["node_number"]
app["node_count"] = rsp_json["node_count"]
log.info("setting node_state to WAITING")
Expand Down
140 changes: 81 additions & 59 deletions hsds/headnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#
import os
import asyncio
import json
import time

from aiohttp.web import Application, StreamResponse, run_app, json_response
Expand Down Expand Up @@ -205,98 +204,121 @@ async def info(request):
log.response(request, resp=resp)
return resp

def get_request_ip(request):
peername = request.transport.get_extra_info('peername')
if peername is None:
msg = "Can not determine caller IP"
log.error(msg)
raise HTTPBadRequest(reason=msg)
log.debug(f"get_request_ip - peername: {peername}")
if peername[0] is None or peername[0] in ("::1", "127.0.0.1"):
host = "localhost"
else:
host = peername[0]
req_port = peername[1]
return (host, req_port)


async def register(request):
""" HTTP method for nodes to register with head node"""
log.request(request)
app = request.app
text = await request.text()
# body = await request.json()
body = json.loads(text)
log.info(f"body: {body}")
if not request.has_body:
msg = "register missing body"
log.warn(msg)
raise HTTPBadRequest(reason=msg)
body = await request.json()
log.info(f"register request body: {body}")
node_host = None
node_port = None
node_type = None
node_id = None
if 'id' not in body:
msg = "Missing 'id'"
log.response(request, code=400, message=msg)
raise HTTPBadRequest(reason=msg)
node_id = body['id']
if 'node_type' not in body:
msg = "missing key 'node_type'"
log.response(request, code=400, message=msg)
raise HTTPBadRequest(reason=msg)
node_type = body['node_type']
if node_type not in ('sn', 'dn'):
msg=f"invalid node_type: {node_type}"
log.response(request, code=400, message=msg)
raise HTTPBadRequest(reason=msg)
if 'port' not in body:
msg = "missing key 'port'"
log.response(request, code=400, message=msg)
raise HTTPBadRequest(reason=msg)
node_port = body['port']

if 'ip' not in body:
log.debug("register - get ip/port from request.transport")
peername = request.transport.get_extra_info('peername')
if peername is None:
msg = "Can not determine caller IP"
log.error(msg)
raise HTTPBadRequest(reason=msg)
log.debug(f"peername: {peername}")
if peername[0] is None or peername[0] in ("::1", "127.0.0.1"):
host = "localhost"
node_host = "localhost"
else:
host = peername[0]
req_port = peername[1]
log.info(f"register host: {host}, port: {req_port}")

node_host = peername[0]
else:
#Specify the ip is useful in docker / DCOS situations, where in certain situations a
#docker private network IP might be used
host = body["ip"]
log.info("explicit specification of host: {}".format(host))
if 'port' not in body:
msg = "missing key 'port'"
log.response(request, code=400, message=msg)
raise HTTPBadRequest(reason=msg)
if 'node_type' not in body:
raise HTTPBadRequest(reason="missing key 'node_type'")
if body['node_type'] not in ('sn', 'dn'):
msg="invalid node_type"
log.response(request, code=400, message=msg)
raise HTTPBadRequest(reason=msg)
node_host = body["ip"]
log.debug("explicit specification of host: {}".format(node_host))

nodes = None
log.info(f"register host: {node_host}, port: {node_port}")

ret_node = None

node_ids = app['node_ids']
if body['id'] in node_ids:
if node_id in node_ids:
# already registered?
log.warn("Node {} is already registered!!! Something may be wrong.".format(body['id']))
ret_node = node_ids[body['id']]
log.warn(f"Node {node_id} is already registered!!! Something may be wrong.")
ret_node = node_ids[node_id]
else:
log.debug(f"Node {body['id']} is unknown, may be a new node coming online.")
log.debug(f"Node {node_id} is unknown, may be a new node coming online.")
nodes = app['nodes']
app['active_sn_count'] = getActiveNodeCount(app, "sn")
app['active_dn_count'] = getActiveNodeCount(app, "dn")

# If the cluster has any failed nodes, replace them. Otherwise, see if the cluster is in the process of growing.
if(body['node_type'] == "dn" or body['node_type'] == "sn"):
replacedNode = False
for node in nodes:
if node['host'] is None and node['node_type'] == body['node_type']:
# found a free node
log.info("Found free node reference: {}".format(node))
node['host'] = host
node['port'] = body["port"]
node['id'] = body["id"]
node['connected'] = unixTimeToUTC(int(time.time()))
node['failcount'] = 0
ret_node = node
node_ids[body["id"]] = ret_node
replacedNode = True
break
if not replacedNode:
node = {"node_number": len(nodes) - 1,
"node_type": body['node_type'],
"host": body['ip'],
"port": body['port'],
"id": body['id'],
"connected": unixTimeToUTC(int(time.time())),
"failcount": 0}
log.debug(f"Added node node_type {node['node_type']} host {node['host']} port {node['port']} id {node['id']} connected {node['connected']} failcount {node['failcount']}")
nodes.append(node)
replacedNode = False
now = unixTimeToUTC(int(time.time()))
for node in nodes:
if node['host'] is None and node['node_type'] == node_type:
# found a free node
log.info(f"Found free node reference: {node}")
node['host'] = node_host
node['port'] = node_port
node['id'] = node_id
node['connected'] = now
node['failcount'] = 0
ret_node = node
node_ids[body["id"]] = ret_node
else:
log.warn("Only sn or dn nodes may be replaced or added to a cluster")

node_ids[node_id] = ret_node
replacedNode = True
break
if not replacedNode:
node = {"node_number": len(nodes) - 1,
"node_type":node_type,
"host": node_host,
"port": node_port,
"id": node_id,
"connected": now,
"failcount": 0}
log.info(f"Added node node_type {node_type} host {node_host} port {node_port} id {node_id} connected {now}")
nodes.append(node)
ret_node = node
node_ids[node_id] = ret_node

if ret_node is None:
log.info("no free node to assign")

inactive_node_count = getInactiveNodeCount(app)
log.info("inactive_node_count: {}".format(inactive_node_count))
log.info(f"inactive_node_count: {inactive_node_count}")
if inactive_node_count == 0:
# all the nodes have checked in
log.info(f"setting cluster state to READY - was: {app['cluster_state']}")
Expand Down Expand Up @@ -341,8 +363,8 @@ async def nodestate(request):
for node in app["nodes"]:
if node["node_type"] == node_type or node_type == "*":
nodes.append(node)
log.debug(f"Added a node in nodestate method, up to {len(nodes)} nodes.")
answer = {"nodes": nodes }
log.debug(f"returning nodestate for {len(nodes)} nodes")
else:
answer = {}
for node in app["nodes"]:
Expand Down

0 comments on commit 4d31769

Please sign in to comment.