Skip to content

Commit

Permalink
Merge pull request #209 from falconkirtaran/master
Browse files Browse the repository at this point in the history
Support for uploading directly to IA
  • Loading branch information
hannahwhy committed Jul 6, 2016
2 parents 2c7e330 + 09800d6 commit 5f53460
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 21 deletions.
10 changes: 6 additions & 4 deletions pipeline/archivebot/seesaw/wpull.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def add_args(args, names, item):
if value:
args.append(value)

def make_args(item, default_user_agent, wpull_exe, youtube_dl_exe, phantomjs_exe, finished_warcs_dir):
def make_args(item, default_user_agent, wpull_exe, youtube_dl_exe, phantomjs_exe, finished_warcs_dir, warc_max_size):
# -----------------------------------------------------------------------
# BASE ARGUMENTS
# -----------------------------------------------------------------------
Expand Down Expand Up @@ -40,7 +40,7 @@ def make_args(item, default_user_agent, wpull_exe, youtube_dl_exe, phantomjs_exe
'--tries', '3',
'--waitretry', '5',
'--warc-file', '%(item_dir)s/%(warc_file_base)s' % item,
'--warc-max-size', '5368709120',
'--warc-max-size', warc_max_size,
'--warc-header', 'operator: Archive Team',
'--warc-header', 'downloaded-by: ArchiveBot',
'--warc-header', 'archivebot-job-ident: %(ident)s' % item,
Expand Down Expand Up @@ -114,15 +114,17 @@ def make_args(item, default_user_agent, wpull_exe, youtube_dl_exe, phantomjs_exe
# ---------------------------------------------------------------------------

class WpullArgs(object):
def __init__(self, *, default_user_agent, wpull_exe, youtube_dl_exe, phantomjs_exe, finished_warcs_dir):
def __init__(self, *, default_user_agent, wpull_exe, youtube_dl_exe, phantomjs_exe, finished_warcs_dir, warc_max_size):
self.default_user_agent = default_user_agent
self.wpull_exe = wpull_exe
self.youtube_dl_exe = youtube_dl_exe
self.phantomjs_exe = phantomjs_exe
self.finished_warcs_dir = finished_warcs_dir
self.warc_max_size = warc_max_size

def realize(self, item):
return make_args(item, self.default_user_agent, self.wpull_exe,
self.youtube_dl_exe, self.phantomjs_exe, self.finished_warcs_dir)
self.youtube_dl_exe, self.phantomjs_exe, self.finished_warcs_dir,
self.warc_max_size)

# vim:ts=4:sw=4:et:tw=78
8 changes: 7 additions & 1 deletion pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
assert 'REDIS_URL' in env, 'REDIS_URL not set.'
assert 'FINISHED_WARCS_DIR' in env, 'FINISHED_WARCS_DIR not set.'

if 'WARC_MAX_SIZE' in env:
WARC_MAX_SIZE = env['WARC_MAX_SIZE']
else:
WARC_MAX_SIZE = '5368709120'

assert 'TMUX' in env or 'STY' in env or env.get('NO_SCREEN') == "1", \
"Refusing to start outside of screen or tmux, set NO_SCREEN=1 to override"

Expand Down Expand Up @@ -115,7 +120,8 @@ def __contains__(self, item):
wpull_exe=WPULL_EXE,
youtube_dl_exe=YOUTUBE_DL,
phantomjs_exe=PHANTOMJS,
finished_warcs_dir=os.environ["FINISHED_WARCS_DIR"]
finished_warcs_dir=os.environ["FINISHED_WARCS_DIR"],
warc_max_size=WARC_MAX_SIZE
)

pipeline = Pipeline(
Expand Down
2 changes: 1 addition & 1 deletion pipeline/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ psutil
pyyaml
redis==2.10.3
trollius>=1.0.2
wpull==1.2.1
wpull==1.2.2
sqlalchemy
hiredis
requests
Expand Down
142 changes: 127 additions & 15 deletions uploader/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import time
import subprocess
import sys
import re
import datetime
import json
import requests

WAIT = 10

WAIT = 5

def try_mkdir(path):
try:
Expand All @@ -22,6 +25,42 @@ def should_upload(basename):
return not basename.startswith('.') and \
(basename.endswith('.warc.gz') or basename.endswith('.json') or basename.endswith('.txt'))

def parse_name(basename):
k = re.split(r'(.*)-\w+-(\d{8})-\d{6}-[^.]*\.warc.gz', basename) # extract domain name and date
if len(k) != 4:
return {'dns': 'UNKNOWN', 'date': datetime.datetime.now().strftime("%Y%m%d")}

return {'dns': k[1], 'date': k[2]}

def ia_upload_allowed(s3_url, accesskey, bucket = ''):
try:
resp = requests.get(url=(s3_url + '/?check_limit=1&accesskey={}&bucket={}'.format(accesskey, bucket)))
data = json.loads(resp.text)
except Exception as err:
print('Could not get throttling status - assuming IA is down')
return False

if 'over_limit' in data and data['over_limit'] is not 0:
print('IA S3 API notifies us we are being throttled (over_limit)')
return False

if 'detail' in data and 'rationing_engaged' in data['detail'] \
and data['detail']['rationing_engaged'] is not 0:
quota_our_remaining = data['detail']['accesskey_ration'] - data['detail']['accesskey_tasks_queued']
quota_global_remaining = data['detail']['total_global_limit'] - data['detail']['total_tasks_queued']
quota_bucket_remaining = data['detail']['bucket_ration'] - data['detail']['bucket_tasks_queued']
if quota_our_remaining < 10 or quota_global_remaining < 10 or quota_bucket_remaining < 5:
print('IA S3 API notifies us rationing is engaged with little room for new work!')
print('Our outstanding jobs: {}'.format(data['detail']['accesskey_tasks_queued']))
print('Our remaining quota: {}'.format(quota_our_remaining))
print('Global remaining quota: {}'.format(quota_global_remaining))
print('Limit reason given: {}'.format(data['detail']['limit_reason']))
return False
else:
print('IA S3 API notifies us rationing is engaged but we have '
'room for another job.')

return True

def main():
if len(sys.argv) > 1:
Expand All @@ -30,17 +69,57 @@ def main():
directory = os.environ['FINISHED_WARCS_DIR']
else:
raise RuntimeError('No directory specified (set FINISHED_WARCS_DIR '
'or specify directory on command line)')
'or specify directory on command line)')

mode = None #modes: 'rsync', 's3'

url = os.environ.get('RSYNC_URL')
if url == None:
raise RuntimeError('RSYNC_URL not set')
if '/localhost' in url or '/127.' in url:
raise RuntimeError("Won't let you upload to localhost because I "
"remove files after uploading them, and you might be uploading "
"to the same directory")

print("CHECK THE UPLOAD TARGET: %s" % (url,))
if url != None:
if '/localhost' in url or '/127.' in url:
raise RuntimeError('Won\'t let you upload to localhost because I '
'remove files after uploading them, and you '
'might be uploading to the same directory')
mode = 'rsync'

if url is None:
url = os.environ.get('S3_URL')
if url is not None:
mode = 's3'

if url is None:
raise RuntimeError('Neither RSYNC_URL nor S3_URL are set - nowhere to '
'upload to. Hint: use'
'S3_URL=https://s3.us.archive.org')

if mode == 's3': #parse IA-S3-specific options
ia_collection = os.environ.get('IA_COLLECTION')
if ia_collection is None:
raise RuntimeError('Must specify IA_COLLECTION if using IA S3 '
'(hint: ArchiveBot)')

ia_item_title = os.environ.get('IA_ITEM_TITLE')
if ia_item_title is None:
raise RuntimeError('Must specify IA_ITEM_TITLE if using IA S3 '
'(hint: "Archiveteam: Archivebot $pipeline_name '
'GO Pack")')

ia_auth = os.environ.get('IA_AUTH')
if ia_auth is None:
raise RuntimeError('Must specify IA_AUTH if using IA S3 '
'(hint: access_key:secret_key)')

ia_item_prefix = os.environ.get('IA_ITEM_PREFIX')
if ia_auth is None:
raise RuntimeError('Must specify IA_ITEM_PREFIX if using IA S3 '
'(hint: archiveteam_archivebot_go_$pipeline_name'
'_}')

ia_access = os.environ.get('IA_ACCESS')
if ia_access is None:
raise RuntimeError('Must specify IA_ACCESS if using IA S3 '
'(hint: your access key)')

print("CHECK THE UPLOAD TARGET: %s as %s endpoint" % (url, mode))
print()
print("Upload target must reliably store data")
print("Each local file will removed after upload")
Expand Down Expand Up @@ -68,10 +147,43 @@ def main():
print("Could not rename %r - another uploader probably grabbed it" % (fname_d,))
else:
print("Uploading %r" % (fname_u,))
exit = subprocess.call([
"rsync", "-av", "--timeout=300", "--contimeout=300",
"--progress", fname_u, url])
if exit == 0:

item = parse_name(basename)
ia_upload_bucket = re.sub(r'[^0-9a-zA-Z-]+', '_', ia_item_prefix + '_' + item['dns'] + '_' + item['date'])

if mode == 'rsync':
exit_code = subprocess.call([
"rsync", "-av", "--timeout=300", "--contimeout=300",
"--progress", fname_u, url])
elif ia_upload_allowed(url, ia_access, ia_upload_bucket): #mode=='s3' and IA is not throttling
# At some point, an ambitious person could try a file belonging in a different bucket if ia_upload_allowed denied this one
size_hint = str(os.stat(fname_u).st_size)
target = url + '/' + ia_upload_bucket + '/' + \
re.sub(r'[^0-9a-zA-Z-.]+', '_', basename)

exit_code = subprocess.call([
"curl", "-v", "--location", "--fail",
"--speed-limit", "1", "--speed-time", "900",
"--header", "x-archive-queue-derive:1",
"--header", "x-amz-auto-make-bucket:1",
"--header", "x-archive-meta-collection:" + ia_collection,
"--header", "x-archive-meta-mediatype:web",
"--header", "x-archive-meta-subject:archivebot",
"--header", "x-archive-meta-title:" + ia_item_title +
' ' + item['dns'] + ' ' + item['date'],
"--header", "x-archive-meta-date:" +
item['date'][0:4] + '-' +
item['date'][4:6] + '-' +
item['date'][6:8],
"--header", "x-archive-size-hint:" + size_hint,
"--header", "authorization: LOW " + ia_auth,
"-o", "/dev/stdout",
"--upload-file", fname_u,
target])
else: #no upload mechanism available
exit_code = 1

if exit_code == 0:
print("Removing %r" % (fname_u,))
os.remove(fname_u)
else:
Expand Down

0 comments on commit 5f53460

Please sign in to comment.