Skip to content

Commit

Permalink
streamline logging messages, remove librato
Browse files Browse the repository at this point in the history
  • Loading branch information
hpiwowar committed Sep 3, 2012
1 parent 8b90821 commit bbf1a1e
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 72 deletions.
1 change: 0 additions & 1 deletion requirements.txt
Expand Up @@ -13,7 +13,6 @@ certifi==0.0.8
chardet==1.0.1
distribute==0.6.10
iso8601==0.1.4
librato==0.2
lxml==2.3.4
newrelic==1.4.0.137
nose==1.1.2
Expand Down
74 changes: 24 additions & 50 deletions totalimpact/backend.py
Expand Up @@ -11,27 +11,6 @@
logger.setLevel(logging.DEBUG)

thread_count = defaultdict(dict)
#import librato
# mylibrato = librato.LibratoConnection(os.environ["LIBRATO_METRICS_USER"], os.environ["LIBRATO_METRICS_TOKEN"])
# def get_or_create(metric_type, name, description):
# if metric_type=="counter":
# try:
# metric = mylibrato.get_counter(name)
# except librato.exceptions.ClientError:
# metric = mylibrato.create_counter(name, description)
# else:
# try:
# metric = mylibrato.get_gauge(name)
# except librato.exceptions.ClientError:
# metric = mylibrato.create_gauge(name, description)
# return metric

# librato_provider_thread_start = get_or_create("guage", "provider_thread_start", "+1 when a provider thread is started")
# librato_provider_thread_end = get_or_create("guage", "provider_thread_end", "+1 when a provider thread is ended")
# librato_provider_thread_run_duration = get_or_create("gauge", "provider_thread_run_duration", "elapsed time for a provider thread to run")
# librato_provider_thread_launch_duration = get_or_create("gauge", "provider_thread_launch_duration", "elapsed time for a provider thread to launch")
# librato_provider_thread_count = get_or_create("gauge", "provider_thread_count", "number of threads running")


class RedisQueue(object):
def __init__(self, queue_name, myredis):
Expand Down Expand Up @@ -103,7 +82,7 @@ def __init__(self, provider, polling_interval, alias_queue, provider_queue, couc
self.couch_queues = couch_queues
self.wrapper = wrapper
self.myredis = myredis
self.name = "worker_"+self.provider_name
self.name = self.provider_name+"_worker"

# last variable is an artifact so it has same call signature as other callbacks
def add_to_couch_queue_if_nonzero(self, tiid, new_content, method_name, dummy=None):
Expand Down Expand Up @@ -132,7 +111,7 @@ def wrapper(cls, tiid, input_aliases_dict, provider, method_name, aliases_provid
# "wrapper", tiid=tiid, provider_name=provider.provider_name, method_name=method_name, aliases=aliases))

provider_name = provider.provider_name
worker_name = "worker_"+provider_name
worker_name = provider_name+"_worker"

input_alias_tuples = ItemFactory.alias_tuples_from_dict(input_aliases_dict)
method = getattr(provider, method_name)
Expand All @@ -158,47 +137,42 @@ def wrapper(cls, tiid, input_aliases_dict, provider, method_name, aliases_provid
logger.info("{:20}: RETURNED {tiid} {method_name} {provider_name} : {response}".format(
worker_name, tiid=tiid, method_name=method_name.upper(),
provider_name=provider_name.upper(), response=response))

callback(tiid, response, method_name, aliases_providers_run)

# librato_provider_thread_run_duration.add(time.time()-start_time, source=provider_name)
# librato_provider_thread_end.add(1, source=provider_name)
# librato_provider_thread_count.add(len(thread_count[provider_name].keys()), source=provider_name)
del thread_count[provider_name][tiid+method_name]

return response

def run(self):
provider_message = self.provider_queue.pop()
if provider_message:
logger.info("POPPED from queue for {provider}".format(
provider=self.provider_name))
#logger.info("POPPED from queue for {provider}".format(
# provider=self.provider_name))
(tiid, alias_dict, method_name, aliases_providers_run) = provider_message
if method_name == "aliases":
callback = self.add_to_alias_and_couch_queues
else:
callback = self.add_to_couch_queue_if_nonzero

thread_count[self.provider.provider_name][tiid+method_name] = 1
#logger.info("BEFORE STARTING thread for {tiid} {method_name} {provider}".format(
# method_name=method_name.upper(), tiid=tiid, num=len(thread_count[self.provider.provider_name].keys()),
# provider=self.provider.provider_name.upper()))

logger.info("BEFORE STARTING thread for {tiid} {method_name} {provider}, now at {num} {provider} threads".format(
method_name=method_name.upper(), tiid=tiid, num=len(thread_count[self.provider.provider_name].keys()),
provider=self.provider.provider_name.upper()))
thread_count[self.provider.provider_name][tiid+method_name] = 1
number_of_threads_for_this_provider = len(thread_count[self.provider.provider_name].keys())
number_of_total_provider_threads = sum([len(thread_count[p].keys()) for p in thread_count])

logger.info("NUMBER of {provider} threads = {num}".format(
num=len(thread_count[self.provider.provider_name].keys()),
logger.info("NUMBER of {provider} threads = {num_provider}, all provider threads = {num_total}".format(
num_provider=number_of_threads_for_this_provider,
num_total=number_of_total_provider_threads,
provider=self.provider.provider_name.upper()))

# librato_provider_thread_start.add(1, source=self.provider.provider_name)
# librato_provider_thread_count.add(len(thread_count[self.provider.provider_name].keys()), source=self.provider.provider_name)

t = threading.Thread(target=ProviderWorker.wrapper,
args=(tiid, alias_dict, self.provider, method_name, aliases_providers_run, callback),
name=self.provider_name+"-"+method_name.upper()+"-"+tiid[0:4])
t.start()

# librato_provider_thread_launch_duration.add(time.time()-start_time, source=self.provider.provider_name)

# sleep to give the provider a rest :)
time.sleep(self.polling_interval)

Expand All @@ -209,7 +183,7 @@ def __init__(self, couch_queue, myredis, mydao):
self.couch_queue = couch_queue
self.myredis = myredis
self.mydao = mydao
self.name = "worker_" + self.couch_queue.queue_name
self.name = self.couch_queue.queue_name + "_worker"

def update_item_with_new_aliases(self, alias_dict, item):
if alias_dict == item["aliases"]:
Expand Down Expand Up @@ -324,15 +298,15 @@ def sniffer(cls, item_aliases, aliases_providers_run, provider_config=default_se
def run(self):
alias_message = self.alias_queue.pop()
if alias_message:
logger.info("{:20}: alias_message said {alias_message}".format(
"Backend.run", alias_message=alias_message))
logger.info("alias_message said {alias_message}".format(
alias_message=alias_message))
(tiid, alias_dict, aliases_providers_run) = alias_message

relevant_provider_names = self.sniffer(alias_dict, aliases_providers_run)
logger.info("{:20}: for {tiid} sniffer got input {alias_dict}".format(
"Backend", tiid=tiid, alias_dict=alias_dict))
logger.info("{:20}: for {tiid} sniffer returned {providers}".format(
"Backend", tiid=tiid, providers=relevant_provider_names))
logger.info("backend for {tiid} sniffer got input {alias_dict}".format(
tiid=tiid, alias_dict=alias_dict))
logger.info("backend for {tiid} sniffer returned {providers}".format(
tiid=tiid, providers=relevant_provider_names))

# list out the method names so they are run in that priority, biblio before metrics
for method_name in ["aliases", "biblio", "metrics"]:
Expand All @@ -356,11 +330,11 @@ def main():
# these need to match the tiid alphabet defined in models:
couch_queues = {}
for i in "abcdefghijklmnopqrstuvwxyz1234567890":
couch_queues[i] = PythonQueue("couch_queue_"+i)
couch_queues[i] = PythonQueue(i+"_couch_queue")
couch_worker = CouchWorker(couch_queues[i], myredis, mydao)
couch_worker.spawn_and_loop()
logger.info("{:20}: launched backend couch worker with couch_queue_{i}".format(
"Backend", i=i))
logger.info("launched backend couch worker with {i}_couch_queue".format(
i=i))


polling_interval = 0.1 # how many seconds between polling to talk to provider
Expand Down
6 changes: 3 additions & 3 deletions totalimpact/dao.py
Expand Up @@ -107,18 +107,18 @@ def save(self, doc):
if "_id" not in doc:
raise KeyError("tried to save doc with '_id' key unset.")

logger.info("%20s saving id '%s'" %("dao", doc["_id"]))
#logger.info("dao saving id '%s'" %(doc["_id"]))
retry = True
while retry:
try:
response = self.db.save(doc)
retry = False
except couchdb.ResourceConflict, e:
logger.info("%20s Couch conflict saving %s; will retry" % ("dao", doc["_id"]))
logger.info("dao Couch conflict saving %s; will retry" % (doc["_id"]))
newer_doc = self.get(doc["_id"])
doc["_rev"] = newer_doc["_rev"]
time.sleep(0.1)
logger.info("%20s saved %s" %("dao", doc["_id"]))
logger.info("dao saved %s" %(doc["_id"]))
return response


Expand Down
34 changes: 17 additions & 17 deletions totalimpact/providers/provider.py
Expand Up @@ -119,11 +119,11 @@ def _get_error(self, status_code, response=None):
text = ""
if status_code >= 500:
error = ProviderServerError(response)
self.logger.info("%20s ProviderServerError status code=%i, %s, %s"
self.logger.info("%s ProviderServerError status code=%i, %s, %s"
% (self.provider_name, status_code, text, str(headers)))
else:
error = ProviderClientError(response)
self.logger.info("%20s ProviderClientError status code=%i, %s, %s"
self.logger.info("%s ProviderClientError status code=%i, %s, %s"
% (self.provider_name, status_code, text, str(headers)))

raise(error)
Expand All @@ -136,7 +136,7 @@ def _get_templated_url(self, template, id, method=None):
def relevant_aliases(self, aliases):
filtered = [alias for alias in aliases
if self.is_relevant_alias(alias)]
#self.logger.debug("%20s relevant_aliases are %s given %s" % (self.provider_name, str(filtered), str(aliases)))
#self.logger.debug("%s relevant_aliases are %s given %s" % (self.provider_name, str(filtered), str(aliases)))

return filtered

Expand Down Expand Up @@ -207,7 +207,7 @@ def member_items(self,
if not self.provides_members:
raise NotImplementedError()

self.logger.debug("%20s getting member_items for %s" % (self.provider_name, query_string))
self.logger.debug("%s getting member_items for %s" % (self.provider_name, query_string))

if not provider_url_template:
provider_url_template = self.member_items_url_template
Expand All @@ -219,7 +219,7 @@ def member_items(self,
response = self.http_get(url, cache_enabled=cache_enabled)

if response.status_code != 200:
self.logger.info("%20s status_code=%i"
self.logger.info("%s status_code=%i"
% (self.provider_name, response.status_code))
if response.status_code == 404:
raise ProviderItemNotFoundError
Expand All @@ -246,7 +246,7 @@ def biblio(self,

# Only lookup biblio for items with appropriate ids
if not id:
#self.logger.debug("%20s not checking biblio, no relevant alias" % (self.provider_name))
#self.logger.debug("%s not checking biblio, no relevant alias" % (self.provider_name))
return None

if not provider_url_template:
Expand All @@ -263,7 +263,7 @@ def get_biblio_for_id(self,
if not self.provides_biblio:
return {}

self.logger.debug("%20s getting biblio for %s" % (self.provider_name, id))
self.logger.debug("%s getting biblio for %s" % (self.provider_name, id))

if not provider_url_template:
provider_url_template = self.biblio_url_template
Expand All @@ -273,7 +273,7 @@ def get_biblio_for_id(self,
response = self.http_get(url, cache_enabled=cache_enabled)

if response.status_code != 200:
self.logger.info("%20s status_code=%i"
self.logger.info("%s status_code=%i"
% (self.provider_name, response.status_code))
if response.status_code == 404: #not found
return {}
Expand Down Expand Up @@ -302,7 +302,7 @@ def aliases(self,
relevant_aliases = self.relevant_aliases(aliases)

if not relevant_aliases:
#self.logger.debug("%20s not checking aliases, no relevant alias" % (self.provider_name))
#self.logger.debug("%s not checking aliases, no relevant alias" % (self.provider_name))
return []

new_aliases = []
Expand All @@ -323,7 +323,7 @@ def _get_aliases_for_id(self,
if not self.provides_aliases:
return []

self.logger.debug("%20s getting aliases for %s" % (self.provider_name, id))
self.logger.debug("%s getting aliases for %s" % (self.provider_name, id))

if not provider_url_template:
provider_url_template = self.aliases_url_template
Expand All @@ -333,7 +333,7 @@ def _get_aliases_for_id(self,
response = self.http_get(url, cache_enabled=cache_enabled)

if response.status_code != 200:
self.logger.info("%20s status_code=%i"
self.logger.info("%s status_code=%i"
% (self.provider_name, response.status_code))
if response.status_code == 404:
return []
Expand Down Expand Up @@ -364,7 +364,7 @@ def metrics(self,

# Only lookup metrics for items with appropriate ids
if not id:
#self.logger.debug("%20s not checking metrics, no relevant alias" % (self.provider_name))
#self.logger.debug("%s not checking metrics, no relevant alias" % (self.provider_name))
return {}

if not provider_url_template:
Expand All @@ -388,7 +388,7 @@ def get_metrics_for_id(self,
if not self.provides_metrics:
return {}

self.logger.debug("%20s getting metrics for %s" % (self.provider_name, id))
self.logger.debug("%s getting metrics for %s" % (self.provider_name, id))

if not provider_url_template:
provider_url_template = self.metrics_url_template
Expand All @@ -397,7 +397,7 @@ def get_metrics_for_id(self,
# try to get a response from the data provider
response = self.http_get(url, cache_enabled=cache_enabled, allow_redirects=True)

#self.logger.debug("%20s get_metrics_for_id response.status_code %i" % (self.provider_name, response.status_code))
#self.logger.debug("%s get_metrics_for_id response.status_code %i" % (self.provider_name, response.status_code))

# extract the metrics
metrics_dict = self._extract_metrics(response.text, response.status_code, id=id)
Expand Down Expand Up @@ -468,7 +468,7 @@ class CachedResponse:
self.logger.debug("LIVE %s" %(url))
r = requests.get(url, headers=headers, timeout=timeout, proxies=proxies, allow_redirects=allow_redirects, verify=False)
except requests.exceptions.Timeout as e:
self.logger.info("%20s Attempt to connect to provider timed out during GET on %s" %(self.provider_name, url))
self.logger.info("%s Attempt to connect to provider timed out during GET on %s" %(self.provider_name, url))
raise ProviderTimeout("Attempt to connect to provider timed out during GET on " + url, e)
except requests.exceptions.RequestException as e:
raise ProviderHttpError("RequestException during GET on: " + url, e)
Expand Down Expand Up @@ -540,7 +540,7 @@ def _load_json(page):
try:
data = simplejson.loads(page)
except simplejson.JSONDecodeError, e:
logger.error("%20s json decode fail '%s'. Here's the string: %s" %("_load_json", e.msg, page))
logger.error("%s json decode fail '%s'. Here's the string: %s" %("_load_json", e.msg, page))
raise ProviderContentMalformedError
return(data)

Expand Down Expand Up @@ -667,7 +667,7 @@ def _extract_from_xml(page, dict_of_keylists):

# given a url that has a doi embedded in it, return the doi
def doi_from_url_string(url):
logger.info("%20s parsing url %s" %("doi_from_url_string", url))
logger.info("%s parsing url %s" %("doi_from_url_string", url))

result = re.findall("(10\.\d+.[0-9a-wA-W_/\.\-%]+)" , url, re.DOTALL)
try:
Expand Down
2 changes: 1 addition & 1 deletion totalimpact/tiredis.py
Expand Up @@ -24,7 +24,7 @@ def get_num_providers_left(self, item_id):

def decr_num_providers_left(self, item_id, provider_name):
num_providers_left = self.decr(item_id)
logger.info("%20s bumped providers_run with %s for %s. %s left to run." % ("tiredis",
logger.info("bumped providers_run with %s for %s. %s left to run." % (
provider_name, item_id, num_providers_left))
return int(num_providers_left)

Expand Down

0 comments on commit bbf1a1e

Please sign in to comment.