Skip to content

Commit

Permalink
only clear dirty flag when write is successful
Browse files Browse the repository at this point in the history
  • Loading branch information
jreadey committed May 22, 2020
1 parent 13d8dea commit 532565f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
4 changes: 2 additions & 2 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
log.debug(f"removing pending s3 write task for {obj_id}")
del pending_s3_write_tasks[obj_id]
# clear dirty flag
if obj_id in dirty_ids and dirty_ids[obj_id][0] == last_update_time:
if obj_id in dirty_ids and dirty_ids[obj_id][0] == last_update_time and success:
log.debug(f"clearing dirty flag for {obj_id}")
del dirty_ids[obj_id]

Expand Down Expand Up @@ -623,7 +623,7 @@ def callback(future):
log.warn(f"obj {obj_id} has been in pending_s3_write for {s3sync_start - pending_s3_write[s3key]} seconds, restarting")
del pending_s3_write[s3key]
if obj_id not in pending_s3_write_tasks:
log.error(f"Expected to find write task for {obj_id}")
log.warn(f"Expected to find write task for {obj_id}")
else:
task = pending_s3_write_tasks[obj_id]
task.cancel()
Expand Down
40 changes: 27 additions & 13 deletions hsds/domain_sn.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,8 @@ async def doFlush(app, root_id, bucket=None):
params["bucket"] = bucket
client = get_http_client(app)
dn_urls = getDataNodeUrls(app)
log.debug(f"dn_urls: {dn_urls}")
log.debug(f"doFlush - dn_urls: {dn_urls}")
failed_count = 0

try:
tasks = []
Expand All @@ -679,23 +680,32 @@ async def doFlush(app, root_id, bucket=None):
done, pending = await asyncio.wait(tasks)
if pending:
# should be empty since we didn't use return_when parameter
log.error("Got pending tasks")
log.error("doFlush - got pending tasks")
raise HTTPInternalServerError()
for task in done:
log.info(f"task: {task}")
log.info(f"doFlush - task: {task}")
if task.exception():
log.warn(f"task had exception: {type(task.exception())}")
raise HTTPInternalServerError()
clientResponse = task.result()
if clientResponse.status != 204:
log.warn(f"expected 204 but got: {clientResponse.status}")
raise HTTPInternalServerError()
log.warn(f"doFlush - task had exception: {type(task.exception())}")
failed_count += 1
else:
clientResponse = task.result()
if clientResponse.status != 204:
log.warn(f"doFlush - expected 204 but got: {clientResponse.status}")
failed_count += 1
except ClientError as ce:
log.error(f"Error for http_put('/groups/{root_id}'): {str(ce)}")
log.error(f"doFlush - ClientError for http_put('/groups/{root_id}'): {str(ce)}")
raise HTTPInternalServerError()
except CancelledError as cle:
log.warn(f"CancelledError '/groups/{root_id}'): {str(cle)}")
log.error(f"doFlush - CancelledError '/groups/{root_id}'): {str(cle)}")
raise HTTPInternalServerError()
log.info(f"doFlush for {root_id} complete, failed: {failed_count} out of {len(dn_urls)}")
if failed_count > 0:
log.error(f"doFlush fail count: {failed_count} returning 500")
return 500
else:
log.info("doFlush no fails, returning 204")
return 204



async def PUT_Domain(request):
Expand Down Expand Up @@ -729,6 +739,7 @@ async def PUT_Domain(request):

if ("flush" in params and params["flush"]) or (body and "flush" in body and body["flush"]):
# flush domain - update existing domain rather than create a new resource
log.info(f"Flush for domain: {domain}")
domain_json = await getDomainJson(app, domain, reload=True)
log.debug(f"got domain_json: {domain_json}")

Expand All @@ -747,9 +758,12 @@ async def PUT_Domain(request):
aclCheck(domain_json, "update", username) # throws exception if not allowed
if "root" in domain_json:
# nothing to do for folder objects
await doFlush(app, domain_json["root"], bucket=bucket)
status_code = await doFlush(app, domain_json["root"], bucket=bucket)
else:
log.info("flush called on folder, ignoring")
status_code = 204
# flush successful
resp = await jsonResponse(request, None, status=204)
resp = await jsonResponse(request, None, status=status_code)
log.response(request, resp=resp)
return resp

Expand Down

0 comments on commit 532565f

Please sign in to comment.