Skip to content

Commit

Permalink
Merge pull request #227 from falconkirtaran/master
Browse files Browse the repository at this point in the history
Limit transmitted filename and transmitted bucket DNS component to 64 bytes each for S3 uploads
  • Loading branch information
hannahwhy committed Oct 26, 2016
2 parents caed894 + 7b06679 commit 6080562
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 43 deletions.
106 changes: 65 additions & 41 deletions pipeline/archivebot/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import redis
import time

from queue import Queue
from queue import Queue, Empty
from threading import Thread
from contextlib import contextmanager
from redis.exceptions import ConnectionError as RedisConnectionError
Expand Down Expand Up @@ -58,6 +58,8 @@ def __init__(self, redis_url, log_channel, pipeline_channel):
self.items_queued_outstanding = 0
self.redis_url = redis_url
self.log_queue = Queue()
self.bytes_downloaded_queue = Queue()
self.item_count_queue = Queue()

self.connect()

Expand Down Expand Up @@ -140,10 +142,9 @@ def mark_aborted(self, ident):
self.mark_aborted_script(keys=[ident], args=[self.log_channel])

def update_bytes_downloaded(self, ident, size):
self.log_queue.put({'type': 'bytes_downloaded',
'ident': ident,
'bytes': size
})
self.bytes_downloaded_queue.put({'ident': ident,
'bytes': size
})

def update_items_downloaded(self, count):
self.items_downloaded_outstanding += count
Expand All @@ -152,12 +153,11 @@ def update_items_queued(self, count):
self.items_queued_outstanding += count

def flush_item_counts(self, ident):
self.log_queue.put({'type': 'item_counts',
'ident': ident,
'items_downloaded':
self.items_downloaded_outstanding,
'items_queued': self.items_queued_outstanding
})
self.item_count_queue.put({'ident': ident,
'items_downloaded':
self.items_downloaded_outstanding,
'items_queued': self.items_queued_outstanding
})
self.items_downloaded_outstanding = 0
self.items_queued_outstanding = 0

Expand All @@ -183,39 +183,63 @@ def unregister_pipeline(self, pipeline_id):
# this job, in a daemonic thread
def ship_logs(self):
while True:
entry = self.log_queue.get() #infinite blocking

if entry['type'] is 'log':
try:
with conn(self):
self.log_script(keys=entry['keys'], args=entry['args'])
except ConnectionError:
pass

elif entry['type'] is 'bytes_downloaded':
try:
with conn(self):
self.redis.hincrby(entry['ident'], 'bytes_downloaded',
entry['bytes'])
except ConnectionError:
pass

elif entry['type'] is 'item_counts':
try:
with conn(self):
self.redis.hincrby(entry['ident'], 'items_downloaded',
entry['items_downloaded'])
self.redis.hincrby(entry['ident'], 'items_queued',
entry['items_queued'])
except ConnectionError:
pass

self.log_queue.task_done()
# Ship a log entry
try:
entry = self.log_queue.get(timeout=5)
with conn(self):
self.log_script(keys=entry['keys'], args=entry['args'])
self.log_queue.task_done()
except Empty:
pass
except ConnectionError:
self.log_queue.task_done()

# Ship bytes entries in aggregate
bytes_entries = {}
try:
entry = self.bytes_downloaded_queue.get(block=False)
if entry['ident'] in bytes_entries:
bytes_entries[entry['ident']] += int(entry['bytes'])
else:
bytes_entries[entry['ident']] = int(entry['bytes'])
self.bytes_downloaded_queue.task_done()
except Empty:
pass

try:
with conn(self):
for ident, count in bytes_entries:
self.redis.hincrby(ident, 'bytes_downloaded', count)
except ConnectionError:
pass

# Ship count entries in aggregate
counts_entries = {}
try:
entry = self.item_count_queue.get(block=False)
if entry['ident'] in counts_entries:
counts_entries[entry['ident']][0] += int(entry['items_downloaded'])
counts_entries[entry['ident']][1] += int(entry['items_queued'])
else:
counts_entries[entry['ident']] = [
int(entry['items_downloaded']),
int(entry['items_queued'])
]
self.item_count_queue.task_done()
except Empty:
pass

try:
with conn(self):
for ident, data in counts_entries:
self.redis.hincrby(ident, 'items_downloaded', data[0])
self.redis.hincrby(ident, 'items_queued', data[1])
except ConnectionError:
pass


def log(self, packet, ident, log_key):
self.log_queue.put({'type': 'log',
'keys': [ident],
self.log_queue.put({'keys': [ident],
'args': [json.dumps(packet), self.log_channel, log_key]
})

Expand Down
4 changes: 2 additions & 2 deletions uploader/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ def main():
"rsync", "-av", "--timeout=300", "--contimeout=300",
"--progress", fname_u, url])
elif mode == 's3':
ia_upload_bucket = re.sub(r'[^0-9a-zA-Z-]+', '_', ia_item_prefix + '_' + item['dns'] + '_' + item['date'])
ia_upload_bucket = re.sub(r'[^0-9a-zA-Z-]+', '_', ia_item_prefix + '_' + item['dns'][-64:] + '_' + item['date'])
if ia_upload_allowed(url, ia_access, ia_upload_bucket): # IA is not throttling
# At some point, an ambitious person could try a file belonging in a different bucket if ia_upload_allowed denied this one
size_hint = str(os.stat(fname_u).st_size)
target = url + '/' + ia_upload_bucket + '/' + \
re.sub(r'[^0-9a-zA-Z-.]+', '_', basename)
re.sub(r'[^0-9a-zA-Z-.]+', '_', basename)[-64:]

exit_code = subprocess.call([
"curl", "-v", "--location", "--fail",
Expand Down

0 comments on commit 6080562

Please sign in to comment.