Skip to content

Commit

Permalink
Setting up max_retries for HTTP requests
Browse files Browse the repository at this point in the history
In case of using async requests - the module tries to perform 20 concurrent requests and the urllib3 sometimes fails to fetch a connection, so setting up max_retries for the urllib3 module.

This change applies to both the resolve_dict and query_solr functions.
  • Loading branch information
rushirajnenuji committed May 11, 2019
1 parent 5ac7bf1 commit 82faf1f
Showing 1 changed file with 66 additions and 40 deletions.
106 changes: 66 additions & 40 deletions src/d1_metrics/d1_metrics/metricsreporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import argparse
import sys
import time
import requests
import json
import urllib.request
Expand All @@ -21,6 +22,8 @@
import asyncio
from aiohttp import ClientSession
import concurrent.futures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

DEFAULT_REPORT_CONFIGURATION={
"report_url" : "https://api.datacite.org/reports",
Expand Down Expand Up @@ -456,8 +459,13 @@ def resolve_MN(self, authoritativeMN):
:param authoritativeMN:
:return: String value of the name of the authoritativeMN
"""
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)

node_url = "https://cn.dataone.org/cn/v2/node/" + authoritativeMN
resp = requests.get(node_url, stream=True)
resp = session.get(node_url, stream=True)
root = ElementTree.fromstring(resp.content)
name = root.find('name').text
return name
Expand Down Expand Up @@ -540,8 +548,14 @@ def query_solr(self, PID):
:return: JSON Object containing the metadata fields queried from Solr
"""

session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)

queryString = 'q=id:"' + PID + '"&fl=origin,title,datePublished,dateUploaded,authoritativeMN,dataUrl&wt=json'
response = requests.get(url = self._config["solr_query_url"], params = queryString)
response = session.get(url = self._config["solr_query_url"], params = queryString)

return response.json()

Expand All @@ -552,48 +566,59 @@ def scheduler(self):
Probably would be called only once in its lifetime
:return: None
"""
mn_list = self.get_MN_List()
mn_list = self.get_MN_Dict()
for node in mn_list:
date = datetime(2013, 1, 1)
stopDate = datetime(2013, 12, 31)

count = 0
while (date.strftime('%Y-%m-%d') != stopDate.strftime('%Y-%m-%d')):
self.logger.debug("Running job for Node: " + node)
# while (date.strftime('%Y-%m-%d') != stopDate.strftime('%Y-%m-%d')):
# self.logger.debug("Running job for Node: " + node)
#
# count = count + 1
#
# prevDate = date + timedelta(days=1)
# date = self.last_day_of_month(prevDate)
#
# start_date, end_date = prevDate.strftime('%m/%d/%Y'),\
# date.strftime('%m/%d/%Y')
#
# unique_pids = self.get_unique_pids(start_date, end_date, node, doi=True)
#
# if (len(unique_pids) > 0):
# self.logger.debug("Job " + " : " + start_date + " to " + end_date)
#
# # Uncomment me to send reports to the HUB!
# response = self.report_handler(start_date, end_date, node, unique_pids)
#
#
# logentry = "Node " + node + " : " + start_date + " to " + end_date + " === " + str(response.status_code)
#
# self.logger.debug(logentry)
#
# if response.status_code != 201:
#
# logentry = "Node " + node + " : " + start_date + " to " + end_date + " === " \
# + str(response.status_code)
# self.logger.error(logentry)
# self.logger.error(str(response.status_code) + " " + response.reason)
# self.logger.error("Headers: " + str(response.headers))
# self.logger.error("Content: " + str((response.content).decode("utf-8")))
# else:
# self.logger.debug(
# "Skipping job for " + node + " " + start_date + " to " + end_date + " - length of PIDS : " + str(
# len(unique_pids)))

count = count + 1

while (date.strftime('%Y-%m-%d') != stopDate.strftime('%Y-%m-%d')):
prevDate = date + timedelta(days=1)
date = self.last_day_of_month(prevDate)

start_date, end_date = prevDate.strftime('%m/%d/%Y'),\
date.strftime('%m/%d/%Y')

unique_pids = self.get_unique_pids(start_date, end_date, node, doi=True)

if (len(unique_pids) > 0):
self.logger.debug("Job " + " : " + start_date + " to " + end_date)

# Uncomment me to send reports to the HUB!
response = self.report_handler(start_date, end_date, node, unique_pids)
start_date, end_date = prevDate.strftime('%m/%d/%Y'), \
date.strftime('%m/%d/%Y')


logentry = "Node " + node + " : " + start_date + " to " + end_date + " === " + str(response.status_code)

self.logger.debug(logentry)

if response.status_code != 201:

logentry = "Node " + node + " : " + start_date + " to " + end_date + " === " \
+ str(response.status_code)
self.logger.error(logentry)
self.logger.error(str(response.status_code) + " " + response.reason)
self.logger.error("Headers: " + str(response.headers))
self.logger.error("Content: " + str((response.content).decode("utf-8")))
else:
self.logger.debug(
"Skipping job for " + node + " " + start_date + " to " + end_date + " - length of PIDS : " + str(
len(unique_pids)))
print("Running for ", start_date, end_date, node)
self.get_es_unique_dois(start_date, end_date, nodeId = node)


def last_day_of_month(self, date):
Expand All @@ -602,7 +627,7 @@ def last_day_of_month(self, date):
return date.replace(month=date.month + 1, day=1) - timedelta(days=1)


def get_MN_List(self):
def get_MN_Dict(self):
"""
Retreives a MN idenifier from the https://cn.dataone.org/cn/v2/node/ endpoint
Used to send the reports for different MNs
Expand All @@ -611,13 +636,14 @@ def get_MN_List(self):
node_url = "https://cn.dataone.org/cn/v2/node/"
resp = requests.get(node_url, stream=True)
root = ElementTree.fromstring(resp.content)
mn_list = set()
mn_dict = dict()

for child in root:
node_type = child.attrib['type']
identifier = child.find('identifier')
if (node_type == "mn"):
mn_list.add(identifier.text)
return(mn_list)
identifier = child.find('identifier').text
name = child.find('name').text
mn_dict[identifier] = name

return(mn_dict)


def get_es_unique_dois(self, start_date, end_date, nodeId = None):
Expand Down

0 comments on commit 82faf1f

Please sign in to comment.