Skip to content

Commit

Permalink
Merge pull request #224 from falconkirtaran/master
Browse files Browse the repository at this point in the history
Async shipping of logs from pipelines to redis
  • Loading branch information
hannahwhy committed Sep 11, 2016
2 parents faa63b2 + 8448e84 commit b2f62d2
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 28 deletions.
84 changes: 57 additions & 27 deletions pipeline/archivebot/control.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import os
import redis
import time

from queue import Queue
from threading import Thread
from contextlib import contextmanager
from redis.exceptions import ConnectionError as RedisConnectionError

Expand Down Expand Up @@ -53,13 +54,17 @@ class Control(object):
def __init__(self, redis_url, log_channel, pipeline_channel):
self.log_channel = log_channel
self.pipeline_channel = pipeline_channel
self.bytes_outstanding = 0
self.items_downloaded_outstanding = 0
self.items_queued_outstanding = 0
self.redis_url = redis_url
self.log_queue = Queue()

self.connect()

self.log_thread = Thread(target=self.ship_logs)
self.log_thread.daemon = True
self.log_thread.start()

def connected(self):
return self.redis is not None

Expand Down Expand Up @@ -135,14 +140,10 @@ def mark_aborted(self, ident):
self.mark_aborted_script(keys=[ident], args=[self.log_channel])

def update_bytes_downloaded(self, ident, size):
try:
with conn(self):
self.bytes_outstanding += size
self.redis.hincrby(ident, 'bytes_downloaded',
self.bytes_outstanding)
self.bytes_outstanding = 0
except ConnectionError:
pass
self.log_queue.put({'type': 'bytes_downloaded',
'ident': ident,
'bytes': size
})

def update_items_downloaded(self, count):
self.items_downloaded_outstanding += count
Expand All @@ -151,17 +152,14 @@ def update_items_queued(self, count):
self.items_queued_outstanding += count

def flush_item_counts(self, ident):
try:
with conn(self):
self.redis.hincrby(ident, 'items_downloaded',
self.items_downloaded_outstanding)
self.items_downloaded_outstanding = 0

self.redis.hincrby(ident, 'items_queued',
self.items_queued_outstanding)
self.items_queued_outstanding = 0
except ConnectionError:
pass
self.log_queue.put({'type': 'item_counts',
'ident': ident,
'items_downloaded':
self.items_downloaded_outstanding,
'items_queued': self.items_queued_outstanding
})
self.items_downloaded_outstanding = 0
self.items_queued_outstanding = 0

def pipeline_report(self, pipeline_id, report):
try:
Expand All @@ -181,13 +179,45 @@ def unregister_pipeline(self, pipeline_id):
except ConnectionError:
pass

# This function is a thread used to asynchronously ship logs to redis for
# this job, in a daemonic thread
def ship_logs(self):
while True:
entry = self.log_queue.get() #infinite blocking

if entry['type'] is 'log':
try:
with conn(self):
self.log_script(keys=entry['keys'], args=entry['args'])
except ConnectionError:
pass

elif entry['type'] is 'bytes_downloaded':
try:
with conn(self):
self.redis.hincrby(entry['ident'], 'bytes_downloaded',
entry['bytes'])
except ConnectionError:
pass

elif entry['type'] is 'item_counts':
try:
with conn(self):
self.redis.hincrby(entry['ident'], 'items_downloaded',
entry['items_downloaded'])
self.redis.hincrby(entry['ident'], 'items_queued',
entry['items_queued'])
except ConnectionError:
pass

self.log_queue.task_done()


def log(self, packet, ident, log_key):
try:
with conn(self):
self.log_script(keys=[ident], args=[json.dumps(packet),
self.log_channel, log_key])
except ConnectionError:
pass
self.log_queue.put({'type': 'log',
'keys': [ident],
'args': [json.dumps(packet), self.log_channel, log_key]
})

def get_url_file(self, ident):
try:
Expand Down
2 changes: 1 addition & 1 deletion pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"See https://bugs.python.org/issue21435"

assert WPULL_EXE, 'No usable Wpull found.'
assert PHANTOMJS, 'PhantomJS %s was not found.' % PHANTOMJS_VERSION
assert PHANTOMJS, 'PhantomJS %s was not found.' % PHANTOMJS_VERSIONS
assert YOUTUBE_DL, 'No usable youtube-dl found.'
assert 'RSYNC_URL' in env, 'RSYNC_URL not set.'
assert 'REDIS_URL' in env, 'REDIS_URL not set.'
Expand Down

0 comments on commit b2f62d2

Please sign in to comment.