Skip to content

Commit

Permalink
Added async report generation for dataset reports.
Browse files Browse the repository at this point in the history
Added async report generation for dataset reports.

Reference: #46  #56
  • Loading branch information
rushirajnenuji committed May 11, 2019
1 parent cc5f24a commit 028da87
Showing 1 changed file with 194 additions and 0 deletions.
194 changes: 194 additions & 0 deletions src/d1_metrics/d1_metrics/metricsreporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,200 @@ def get_report_datasets(self, start_date, end_date, unique_pids, node ):
return (report_datasets)


def get_async_report_datasets(self, start_date, end_date, node):
"""
Generates asynchronous dataset instances.
:param start_date:
Start date of the report
:param end_date:
End date of the report
:param: node
Node identifier for the logs from ES
:return: list object
list of dataset instances as defined in SUSHI format
"""

time_beg = time.time()

metrics_elastic_search = MetricsElasticSearch()
metrics_elastic_search.connect()

count = 0
mn_dict = self.get_MN_Dict()
nodeName = mn_dict[node]

def _get_single_dataset_instance(self, doi, pid_list):
"""
Generartes a single instance of dataset object
:param self:
The self object
:param doi:
The doi for the dataset
:param pid_list:
`datasetIdentifierFamily` for this doi
:return: dictionary object
a single dataset instance as defined in SUSHI format
"""

dataset = {}
pid = pid_list[0]
solr_response = self.query_solr(pid)
if (solr_response["response"]["numFound"] > 0):

if ("title" in (i for i in solr_response["response"]["docs"][0])):
dataset["dataset-title"] = solr_response["response"]["docs"][0]["title"]
else:
dataset["dataset-title"] = ""

if ("authoritativeMN" in (i for i in solr_response["response"]["docs"][0])):
dataset["publisher"] = mn_dict[solr_response["response"]["docs"][0]["authoritativeMN"]]
else:
dataset["publisher"].append(
{"type": "urn", "value": nodeName})

if ("authoritativeMN" in (i for i in solr_response["response"]["docs"][0])):
dataset["publisher-id"] = []
dataset["publisher-id"].append(
{"type": "urn", "value": solr_response["response"]["docs"][0]["authoritativeMN"]})
else:
dataset["publisher-id"].append(
{"type": "urn", "value": node})

dataset["platform"] = "DataONE"

if ("origin" in (i for i in solr_response["response"]["docs"][0])):
contributors = []
for i in solr_response["response"]["docs"][0]["origin"]:
contributors.append({"type": "name", "value": i})
dataset["dataset-contributors"] = contributors

if ("datePublished" in (i for i in solr_response["response"]["docs"][0])):
dataset["dataset-dates"] = []
dataset["dataset-dates"].append(
{"type": "pub-date", "value": solr_response["response"]["docs"][0]["datePublished"][:10]})
else:
dataset["dataset-dates"] = []
dataset["dataset-dates"].append(
{"type": "pub-date", "value": solr_response["response"]["docs"][0]["dateUploaded"][:10]})

if "doi" in pid:
dataset["dataset-id"] = [{"type": "doi", "value": doi}]
else:
pass
# dataset["dataset-id"] = [{"type": "other-id", "value": pid}]

dataset["yop"] = dataset["dataset-dates"][0]["value"][:4]

if ("dataUrl" in (i for i in solr_response["response"]["docs"][0])):
dataset["uri"] = solr_response["response"]["docs"][0]["dataUrl"]

dataset["data-type"] = "dataset"

dataset["performance"] = []
performance = {}

performance["period"] = {}
performance["period"]["begin-date"] = (datetime.strptime(start_date, '%m/%d/%Y')).strftime('%Y-%m-%d')
performance["period"]["end-date"] = (datetime.strptime(end_date, '%m/%d/%Y')).strftime('%Y-%m-%d')

instance = []

report_instances = self.generate_instances(start_date, end_date, pid_list)

if ("METADATA" in report_instances):
total_dataset_investigation = {"count": report_instances["METADATA"]["total_investigations"],
"access-method": "regular",
"metric-type": "total-dataset-investigations",
"country-counts": report_instances["METADATA"][
"country_total_investigations"]}

unique_dataset_investigation = {"count": report_instances["METADATA"]["unique_investigations"],
"access-method": "regular",
"metric-type": "unique-dataset-investigations",
"country-counts": report_instances["METADATA"][
"country_unique_investigations"]}
instance.append(total_dataset_investigation)
instance.append(unique_dataset_investigation)

if ("DATA" in report_instances):
total_dataset_requests = {"count": report_instances["DATA"]["total_requests"],
"access-method": "regular",
"metric-type": "total-dataset-requests",
"country-counts": report_instances["DATA"]["country_total_requests"]}

unique_dataset_requests = {"count": report_instances["DATA"]["unique_requests"],
"access-method": "regular",
"metric-type": "unique-dataset-requests",
"country-counts": report_instances["DATA"]["country_unique_requests"]}
instance.append(total_dataset_requests)
instance.append(unique_dataset_requests)

performance["instance"] = instance

dataset["performance"].append(performance)

else:
pass

return dataset

async def _work_get_all_datasets_instances(self, doi_dict):
"""
For all the PIDs in the doi_dict, create async jobs and execute them concurrently
:param self:
Class object
:param doi_dict:
Dictionary of `doi` as key and `datasetIdentifierFamily` as value
:return:
None
"""

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
self.logger.info("self.async init : %f sec", time.time() - time_beg)
loop = asyncio.get_event_loop()
tasks = []

for pid, pid_list in doi_dict.items():
tasks.append(loop.run_in_executor(executor, _get_single_dataset_instance, self, pid, pid_list))

self.logger.info(len(tasks))

for response in await asyncio.gather(*tasks):
if len(tasks) % 100 == 0:
self.logger.info(len(tasks))
report_datasets.append(response)

self.logger.info("self.async end : %f sec", time.time() - time_beg)

report_datasets = []
doi_dict = self.get_es_unique_dois(start_date, end_date, nodeId=node)

self.logger.info("self.get_es_unique_dois : %f sec", time.time() - time_beg)
self.logger.debug("Processing " + str(len(doi_dict)) + " datasets for node " + node)

# In a multithreading environment such as under gunicorn, the new thread created by
# gevent may not provide an event loop. Create a new one if necessary.
try:
loop = asyncio.get_event_loop()
except RuntimeError as e:
_L.info("Creating new event loop.")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

future = asyncio.ensure_future(_work_get_all_datasets_instances(self, doi_dict))
loop.run_until_complete(future)

self.logger.info("total : %f sec", time.time() - time_beg)

return (report_datasets)


def resolve_MN(self, authoritativeMN):
"""
Queries the Node endpoint to retrieve the details about the authoritativeMN
Expand Down

0 comments on commit 028da87

Please sign in to comment.