Skip to content

Commit

Permalink
Sending compressed reports for long datasets
Browse files Browse the repository at this point in the history
Reference #45

Adding features to send compressed reports.
If ther report length is too large (>2000 dataset instances), then send compressed reports.
  • Loading branch information
rushirajnenuji committed May 16, 2019
1 parent 028da87 commit dff04d3
Showing 1 changed file with 102 additions and 30 deletions.
132 changes: 102 additions & 30 deletions src/d1_metrics/d1_metrics/metricsreporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,22 +61,28 @@ def __init__(self):
self.logger.addHandler(ch)


def report_handler(self, start_date, end_date, node, unique_pids):
def report_handler(self, start_date, end_date, node, doi_dict):
"""
Creates a Report JSON object, dumps it to a file and sends the report to the Hub.
This is a handler function that manages the entire work flow
:param start_date:
:param end_date:
:param: node
:param: unique_pids
:param: doi_dict
:return: None
"""
json_object = {}
json_object["report-header"] = self.get_report_header(start_date, end_date, node)
json_object["report-datasets"] = self.get_report_datasets(start_date, end_date, unique_pids, node)
with open('./reports/' + ("DSR-D1-" + (datetime.strptime(end_date,'%m/%d/%Y')).strftime('%Y-%m-%d'))+ "-" + node+'.json', 'w') as outfile:
json_object["report-datasets"] = self.get_async_report_datasets(start_date, end_date, node, doi_dict)

with open('./async-reports/' + ("DSR-D1-" + (datetime.strptime(end_date,'%m/%d/%Y')).strftime('%Y-%m-%d'))+ "-" + node+'.json', 'w') as outfile:
json.dump(json_object, outfile, indent=2,ensure_ascii=False)
response = self.send_reports(start_date, end_date, node)

if len(doi_dict) > 2000:
response = response = self.send_reports(start_date, end_date, node, compressed=False)
else:
response = response = self.send_reports(start_date, end_date, node, compressed=True)

return response


Expand Down Expand Up @@ -205,7 +211,8 @@ def generate_instances(self, start_date, end_date, pid_list):
{
"country": {
"terms": {
"field": "geoip.country_code2.keyword"
"field": "geoip.country_code2.keyword",
"missing_bucket":"true"
}
}
},
Expand All @@ -226,6 +233,8 @@ def generate_instances(self, start_date, end_date, pid_list):


for i in data["aggregations"]["pid_list"]["buckets"]:
if i["key"]["country"] is None:
i["key"]["country"] = "n/a"
if(i["key"]["format"] == "METADATA"):
if "METADATA" in report_instances:
report_instances["METADATA"]["unique_investigations"] = report_instances["METADATA"]["unique_investigations"] + 1
Expand Down Expand Up @@ -453,7 +462,7 @@ def get_report_datasets(self, start_date, end_date, unique_pids, node ):
return (report_datasets)


def get_async_report_datasets(self, start_date, end_date, node):
def get_async_report_datasets(self, start_date, end_date, node, doi_dict):
"""
Generates asynchronous dataset instances.
Expand All @@ -463,6 +472,8 @@ def get_async_report_datasets(self, start_date, end_date, node):
End date of the report
:param: node
Node identifier for the logs from ES
:param: doi_dict
Dictionary object with dois and their datasetIdentifierFamily
:return: list object
list of dataset instances as defined in SUSHI format
Expand Down Expand Up @@ -607,7 +618,7 @@ async def _work_get_all_datasets_instances(self, doi_dict):
None
"""

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENT_REQUESTS) as executor:
self.logger.info("self.async init : %f sec", time.time() - time_beg)
loop = asyncio.get_event_loop()
tasks = []
Expand All @@ -625,7 +636,6 @@ async def _work_get_all_datasets_instances(self, doi_dict):
self.logger.info("self.async end : %f sec", time.time() - time_beg)

report_datasets = []
doi_dict = self.get_es_unique_dois(start_date, end_date, nodeId=node)

self.logger.info("self.get_es_unique_dois : %f sec", time.time() - time_beg)
self.logger.debug("Processing " + str(len(doi_dict)) + " datasets for node " + node)
Expand Down Expand Up @@ -720,17 +730,65 @@ def resolvePIDs(self, PIDs):
return PIDs


def send_reports(self, start_date, end_date, node):
def send_reports(self, start_date, end_date, node, compressed=False):
"""
Sends report to the Hub at the specified Hub report url in the config parameters
:return: Nothing
Sends reports to the Hub at the specified Hub report url in the config parameters.
The DataCite HUB has a limit on the number of dataset instance that it can inject.
If ther reports are too large, it gives errors while injesting the reports.
To handle cases with large reports the `compressed` parameters should be set to True.
:param: start_date
String object representing the beginning of the report
:param: end_date
String object representing the end interval of the report
:param: node
The corresponding node to which the report belong to
:param: compressed
A boolean parameter that represents whether to send zipped reports or not
:return: response
A HTTP reponse object reporesenting the status of the sent zipped report
"""
s = requests.session()
s.headers.update(
{'Authorization': "Bearer " + self._config["auth_token"], 'Content-Type': 'application/json', 'Accept': 'application/json'})
with open("./reports/DSR-D1-" + (datetime.strptime(end_date,'%m/%d/%Y')).strftime('%Y-%m-%d')+ "-" + node+'.json', 'r') as content_file:
content = content_file.read()
response = s.post(self._config["report_url"], data=content.encode("utf-8"))

if compressed:

s.headers.update(
{'Authorization': "Bearer " + self._config["auth_token"], 'Content-Type': 'application/gzip', 'Accept': 'gzip', 'Content-Encoding': 'gzip'})


with open("./reports/DSR-D1-" + (datetime.strptime(end_date,'%m/%d/%Y')).strftime('%Y-%m-%d')+ "-" + node+'.json', 'r') as content_file:
# JSON large object data
jlob = content_file.read()

# JSON large object bytes
jlob = jlob.encode("utf-8")

with open(name + ".gzip", mode="w") as f:
f.write(gzip.compress(jlob))

response = s.post(self._config["report_url"], data=gzip.compress(jlob))

else:
s.headers.update(
{'Authorization': "Bearer " + self._config["auth_token"], 'Content-Type': 'application/json',
'Accept': 'application/json'})

with open("./reports/DSR-D1-" + (datetime.strptime(end_date, '%m/%d/%Y')).strftime(
'%Y-%m-%d') + "-" + node + '.json', 'r') as content_file:
content = content_file.read()

response = s.post(self._config["report_url"], data=content.encode("utf-8"))

self.logger.info(response)
self.logger.info(str(response.status_code) + " " + response.reason)
self.logger.info("Headers: " + str(response.headers))
self.logger.info("Content: " + str((response.content).decode("utf-8")))

return response

Expand Down Expand Up @@ -762,8 +820,8 @@ def scheduler(self):
"""
mn_list = self.get_MN_Dict()
for node in mn_list:
date = datetime(2013, 1, 1)
stopDate = datetime(2013, 12, 31)
date = datetime(2012, 12, 31)
stopDate = datetime(2013, 1, 31)

count = 0
while (date.strftime('%Y-%m-%d') != stopDate.strftime('%Y-%m-%d')):
Expand All @@ -777,14 +835,13 @@ def scheduler(self):
start_date, end_date = prevDate.strftime('%m/%d/%Y'),\
date.strftime('%m/%d/%Y')

unique_pids = self.get_unique_pids(start_date, end_date, node, doi=True)
doi_dict = self.get_es_unique_dois(start_date, end_date, nodeId = node)

if (len(unique_pids) > 0):
if (len(doi_dict) > 0):
self.logger.debug("Job " + " : " + start_date + " to " + end_date)

# Uncomment me to send reports to the HUB!
response = self.report_handler(start_date, end_date, node, unique_pids)

self.report_handler(start_date, end_date, node, doi_dict)

logentry = "Node " + node + " : " + start_date + " to " + end_date + " === " + str(response.status_code)

Expand All @@ -801,10 +858,19 @@ def scheduler(self):
else:
self.logger.debug(
"Skipping job for " + node + " " + start_date + " to " + end_date + " - length of PIDS : " + str(
len(unique_pids)))
len(doi_dict)))


def last_day_of_month(self, date):
"""
Returns the last day of the month for report generation
:param date:
A date object to get the last date of that month
:return: date object
Last day of the month for the date instance supplied in the parameter
"""
if date.month == 12:
return date.replace(day=31)
return date.replace(month=date.month + 1, day=1) - timedelta(days=1)
Expand All @@ -814,19 +880,24 @@ def get_MN_Dict(self):
"""
Retreives a MN idenifier from the https://cn.dataone.org/cn/v2/node/ endpoint
Used to send the reports for different MNs
:return: Set of Member Node identifiers
:return: Dictionary of Member Node identifiers
Key - MN identifier
Value - Full name of the MN
"""
node_url = "https://cn.dataone.org/cn/v2/node/"
resp = requests.get(node_url, stream=True)
root = ElementTree.fromstring(resp.content)
mn_dict = dict()

for child in root:
identifier = child.find('identifier').text
name = child.find('name').text
mn_dict[identifier] = name
if child.get('type') == "mn":
identifier = child.find('identifier').text
name = child.find('name').text
mn_dict[identifier] = name

return(mn_dict)
return (mn_dict)


def get_es_unique_dois(self, start_date, end_date, nodeId = None):
Expand Down Expand Up @@ -1036,4 +1107,5 @@ async def work_get_identifier_family(doi_dict):
# md.report_handler("05/01/2018", "05/30/2018")
# md.get_unique_pids("05/01/2018", "05/31/2018")
md.scheduler()
# md.resolvePIDs(["doi:10.18739/A2X65H"])
# md.resolvePIDs(["doi:10.18739/A2X65H"])
# md.get_MN_Dict()

0 comments on commit dff04d3

Please sign in to comment.