Skip to content
This repository has been archived by the owner on Apr 25, 2021. It is now read-only.

Commit

Permalink
Rename cli utils
Browse files Browse the repository at this point in the history
crocoite-recursive is now just crocoite, crocoite-grab is not
user-facing any more and called crocoite-single.

In preparation for 1.0 release.
  • Loading branch information
PromyLOPh committed Jul 4, 2019
1 parent 123b370 commit ec5e6a0
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 98 deletions.
2 changes: 1 addition & 1 deletion contrib/dashboard.js
Expand Up @@ -72,7 +72,7 @@ ws.onmessage = function (event) {
} else if (rmsg.uuid == '5b8498e4-868d-413c-a67e-004516b8452c') {
/* recursion status */
Object.assign (j.stats, rmsg);
} else if (rmsg.uuid == '1680f384-744c-4b8a-815b-7346e632e8db') {
} else if (rmsg.uuid == 'd1288fbe-8bae-42c8-af8c-f2fa8b41794f') {
/* fetch */
j.addUrl (rmsg.url);
}
Expand Down
101 changes: 36 additions & 65 deletions crocoite/cli.py
Expand Up @@ -42,6 +42,13 @@
WarcHandlerConsumer, Level
from .devtools import Crashed

def absurl (s):
""" argparse: Absolute URL """
u = URL (s)
if u.is_absolute ():
return u
raise argparse.ArgumentTypeError ('Must be absolute')

class SingleExitStatus(IntEnum):
""" Exit status for single-shot command line """
Ok = 0
Expand All @@ -50,21 +57,8 @@ class SingleExitStatus(IntEnum):
Navigate = 3

def single ():
"""
One-shot command line interface and pywb_ playback:
.. code:: bash
pip install pywb
crocoite-grab http://example.com/ example.com.warc.gz
rm -rf collections && wb-manager init test && wb-manager add test example.com.warc.gz
wayback &
$BROWSER http://localhost:8080
.. _pywb: https://github.com/ikreymer/pywb
"""
parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
parser.add_argument('--browser', help='DevTools URL', metavar='URL')
parser = argparse.ArgumentParser(description='crocoite helper tools to fetch individual pages.')
parser.add_argument('--browser', help='DevTools URL', type=absurl, metavar='URL')
parser.add_argument('--timeout', default=1*60*60, type=int, help='Maximum time for archival', metavar='SEC')
parser.add_argument('--idle-timeout', default=30, type=int, help='Maximum idle seconds (i.e. no requests)', dest='idleTimeout', metavar='SEC')
parser.add_argument('--behavior', help='Enable behavior script',
Expand All @@ -77,7 +71,7 @@ def single ():
parser.add_argument('-k', '--insecure',
action='store_true',
help='Disable certificate validation')
parser.add_argument('url', help='Website URL', type=URL, metavar='URL')
parser.add_argument('url', help='Website URL', type=absurl, metavar='URL')
parser.add_argument('output', help='WARC filename', metavar='FILE')

args = parser.parse_args ()
Expand Down Expand Up @@ -135,50 +129,40 @@ def parsePolicy (recursive, url):
return DepthLimit (int (recursive))
elif recursive == 'prefix':
return PrefixLimit (url)
raise ValueError ('Unsupported')
raise argparse.ArgumentTypeError ('Unsupported recursion mode')

def recursive ():
"""
crocoite is built with the Unix philosophy (“do one thing and do it well”) in
mind. Thus ``crocoite-grab`` can only save a single page. If you want recursion
use ``crocoite-recursive``, which follows hyperlinks according to ``--policy``.
It can either recurse a maximum number of levels or grab all pages with the
same prefix as the start URL:
.. code:: bash
crocoite-recursive --policy prefix http://www.example.com/dir/ output
will save all pages in ``/dir/`` and below to individual files in the output
directory ``output``. You can customize the command used to grab individual
pages by appending it after ``output``. This way distributed grabs (ssh to a
different machine and execute the job there, queue the command with Slurm, …)
are possible.
"""

logger = Logger (consumer=[DatetimeConsumer (), JsonPrintConsumer ()])

parser = argparse.ArgumentParser(description='Recursively run crocoite-grab.')
parser.add_argument('--policy', help='Recursion policy', metavar='POLICY')
parser.add_argument('--tempdir', help='Directory for temporary files', metavar='DIR')
parser.add_argument('--prefix', help='Output filename prefix, supports templates {host} and {date}', metavar='FILENAME', default='{host}-{date}-')
parser.add_argument('--concurrency', '-j', help='Run at most N jobs', metavar='N', default=1, type=int)
parser.add_argument('url', help='Seed URL', type=URL, metavar='URL')
parser.add_argument('output', help='Output directory', metavar='DIR')
parser.add_argument('command', help='Fetch command, supports templates {url} and {dest}', metavar='CMD', nargs='*', default=['crocoite-grab', '{url}', '{dest}'])
parser = argparse.ArgumentParser(description='Save website to WARC using Google Chrome.')
parser.add_argument('-j', '--concurrency',
help='Run at most N jobs concurrently', metavar='N', default=1,
type=int)
parser.add_argument('-r', '--recursion', help='Recursion policy',
metavar='POLICY')
parser.add_argument('--tempdir', help='Directory for temporary files',
metavar='DIR')
parser.add_argument('url', help='Seed URL', type=absurl, metavar='URL')
parser.add_argument('output',
help='Output file, supports templates {host}, {date} and {seqnum}',
metavar='FILE')
parser.add_argument('command',
help='Fetch command, supports templates {url} and {dest}',
metavar='CMD', nargs='*',
default=['crocoite-single', '{url}', '{dest}'])

args = parser.parse_args ()
try:
policy = parsePolicy (args.policy, args.url)
except ValueError:
parser.error ('Invalid argument for --policy')
policy = parsePolicy (args.recursion, args.url)
except argparse.ArgumentTypeError as e:
parser.error (str (e))

os.makedirs (args.output, exist_ok=True)

controller = RecursiveController (url=args.url, output=args.output,
command=args.command, logger=logger, policy=policy,
tempdir=args.tempdir, prefix=args.prefix,
concurrency=args.concurrency)
try:
controller = RecursiveController (url=args.url, output=args.output,
command=args.command, logger=logger, policy=policy,
tempdir=args.tempdir, concurrency=args.concurrency)
except ValueError as e:
parser.error (str (e))

run = asyncio.ensure_future (controller.run ())
loop = asyncio.get_event_loop()
Expand All @@ -191,19 +175,6 @@ def recursive ():
return 0

def irc ():
"""
A simple IRC bot (“chromebot”) is provided with the command ``crocoite-irc``.
It reads its configuration from a config file like the example provided in
``contrib/chromebot.json`` and supports the following commands:
a <url> -j <concurrency> -r <policy>
Archive <url> with <concurrency> processes according to recursion <policy>
s <uuid>
Get job status for <uuid>
r <uuid>
Revoke or abort running job with <uuid>
"""

import json, re
from .irc import Chromebot

Expand Down
72 changes: 55 additions & 17 deletions crocoite/controller.py
Expand Up @@ -22,7 +22,7 @@
Controller classes, handling actions required for archival
"""

import time, tempfile, asyncio, json, os
import time, tempfile, asyncio, json, os, shutil
from itertools import islice
from datetime import datetime
from operator import attrgetter
Expand Down Expand Up @@ -355,6 +355,10 @@ def __init__ (self, prefix):
def __call__ (self, urls):
return set (filter (lambda u: str(u.value).startswith (str (self.prefix)), urls))

def hasTemplate (s):
""" Return True if string s has string templates """
return '{' in s and '}' in s

class RecursiveController:
"""
Simple recursive controller
Expand All @@ -363,31 +367,36 @@ class RecursiveController:
"""

__slots__ = ('url', 'output', 'command', 'logger', 'policy', 'have',
'pending', 'stats', 'prefix', 'tempdir', 'running', 'concurrency')
'pending', 'stats', 'tempdir', 'running', 'concurrency',
'copyLock')

SCHEME_WHITELIST = {'http', 'https'}

def __init__ (self, url, output, command, logger, prefix='{host}-{date}-',
def __init__ (self, url, output, command, logger,
tempdir=None, policy=DepthLimit (0), concurrency=1):
self.url = url
self.output = output
self.command = command
self.prefix = prefix
self.logger = logger.bind (context=type(self).__name__, seedurl=url)
self.policy = policy
self.tempdir = tempdir
# A lock if only a single output file (no template) is requested
self.copyLock = None if hasTemplate (output) else asyncio.Lock ()
# some sanity checks. XXX move to argparse?
if self.copyLock and os.path.exists (self.output):
raise ValueError ('Output file exists')
# tasks currently running
self.running = set ()
# max number of tasks running
self.concurrency = concurrency
# keep in sync with StatsHandler
self.stats = {'requests': 0, 'finished': 0, 'failed': 0, 'bytesRcv': 0, 'crashed': 0, 'ignored': 0}

async def fetch (self, entry):
async def fetch (self, entry, seqnum):
"""
Fetch a single URL using an external command
command is usually crocoite-grab
command is usually crocoite-single
"""

assert isinstance (entry, SetEntry)
Expand All @@ -403,8 +412,9 @@ def formatCommand (e):
else:
return e.format (url=url, dest=dest.name)

def formatPrefix (p):
return p.format (host=url.host, date=datetime.utcnow ().isoformat ())
def formatOutput (p):
return p.format (host=url.host,
date=datetime.utcnow ().isoformat (), seqnum=seqnum)

def logStats ():
logger.info ('stats', uuid='24d92d16-770e-4088-b769-4020e127a7ff', **self.stats)
Expand All @@ -417,14 +427,15 @@ def logStats ():
return

dest = tempfile.NamedTemporaryFile (dir=self.tempdir,
prefix=formatPrefix (self.prefix), suffix='.warc.gz',
delete=False)
destpath = os.path.join (self.output, os.path.basename (dest.name))
prefix=__package__, suffix='.warc.gz', delete=False)
command = list (map (formatCommand, self.command))
logger.info ('fetch', uuid='1680f384-744c-4b8a-815b-7346e632e8db', command=command, destfile=destpath)
logger.info ('fetch', uuid='d1288fbe-8bae-42c8-af8c-f2fa8b41794f',
command=command)
try:
process = await asyncio.create_subprocess_exec (*command, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL, stdin=asyncio.subprocess.DEVNULL,
process = await asyncio.create_subprocess_exec (*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
start_new_session=True, limit=100*1024*1024)
while True:
data = await process.stdout.readline ()
Expand All @@ -449,8 +460,33 @@ def logStats ():
finally:
code = await process.wait()
if code == 0:
# atomically move once finished
os.rename (dest.name, destpath)
if self.copyLock is None:
# atomically move once finished
lastDestpath = None
while True:
# XXX: must generate a new name every time, otherwise
# this loop never terminates
destpath = formatOutput (self.output)
assert destpath != lastDestpath
lastDestpath = destpath

# python does not have rename(…, …, RENAME_NOREPLACE),
# but this is safe nontheless, since we’re
# single-threaded
if not os.path.exists (destpath):
# create the directory, so templates like
# /{host}/{date}/… are possible
os.makedirs (os.path.dirname (destpath), exist_ok=True)
os.rename (dest.name, destpath)
break
else:
# atomically (in the context of this process) append to
# existing file
async with self.copyLock:
with open (dest.name, 'rb') as infd, \
open (self.output, 'ab') as outfd:
shutil.copyfileobj (infd, outfd)
os.unlink (dest.name)
else:
self.stats['crashed'] += 1
logStats ()
Expand All @@ -464,6 +500,7 @@ def log ():
have=len (self.have)-len(self.running),
running=len (self.running))

seqnum = 1
try:
self.have = set ()
self.pending = set ([SetEntry (self.url, depth=0)])
Expand All @@ -472,8 +509,9 @@ def log ():
# since pending is a set this picks a random item, which is fine
u = self.pending.pop ()
self.have.add (u)
t = asyncio.ensure_future (self.fetch (u))
t = asyncio.ensure_future (self.fetch (u, seqnum))
self.running.add (t)
seqnum += 1

log ()

Expand Down
19 changes: 11 additions & 8 deletions crocoite/irc.py
Expand Up @@ -22,7 +22,7 @@
IRC bot “chromebot”
"""

import asyncio, argparse, json, tempfile, time, random
import asyncio, argparse, json, tempfile, time, random, os
from datetime import datetime
from urllib.parse import urlsplit
from enum import IntEnum, unique
Expand Down Expand Up @@ -500,17 +500,21 @@ async def handleArchive (self, user, args, reply):
'recursive': args.recursive,
'concurrency': args.concurrency,
}}
grabCmd = ['crocoite-grab']
grabCmd = ['crocoite-single']
grabCmd.extend (['--warcinfo',
'!' + json.dumps (warcinfo, cls=StrJsonEncoder)])
if args.insecure:
grabCmd.append ('--insecure')
grabCmd.extend (['{url}', '{dest}'])
# prefix warcinfo with !, so it won’t get expanded
cmdline = ['crocoite-recursive', args.url, '--tempdir', self.tempdir,
'--prefix', j.id + '-{host}-{date}-', '--policy',
args.recursive, '--concurrency', str (args.concurrency),
self.destdir, '--'] + grabCmd
cmdline = ['crocoite',
'--tempdir', self.tempdir,
'--recursion', args.recursive,
'--concurrency', str (args.concurrency),
args.url,
os.path.join (self.destdir,
j.id + '-{host}-{date}-{seqnum}.warc.gz'),
'--'] + grabCmd

strargs = ', '.join (map (lambda x: '{}={}'.format (*x), showargs.items ()))
reply (f'{args.url} has been queued as {j.id} with {strargs}')
Expand Down Expand Up @@ -640,9 +644,8 @@ def handleStdin (self):
elif msgid == '5c0f9a11-dcd8-4182-a60f-54f4d3ab3687':
nesteddata = data['data']
nestedmsgid = nesteddata['uuid']
if nestedmsgid == '1680f384-744c-4b8a-815b-7346e632e8db':
if nestedmsgid == 'd1288fbe-8bae-42c8-af8c-f2fa8b41794f':
del nesteddata['command']
del nesteddata['destfile']

buf = json.dumps (data)
for c in self.clients:
Expand Down
23 changes: 18 additions & 5 deletions doc/usage.rst
@@ -1,15 +1,28 @@
Usage
-----

.. autofunction:: crocoite.cli.single
One-shot command line interface and pywb_ playback:

Recursion
^^^^^^^^^
.. code:: bash
.. autofunction:: crocoite.cli.recursive
pip install pywb
crocoite http://example.com/ example.com.warc.gz
rm -rf collections && wb-manager init test && wb-manager add test example.com.warc.gz
wayback &
$BROWSER http://localhost:8080
.. _pywb: https://github.com/ikreymer/pywb

IRC bot
^^^^^^^

.. autofunction:: crocoite.cli.irc
A simple IRC bot (“chromebot”) is provided with the command ``crocoite-irc``.
It reads its configuration from a config file like the example provided in
``contrib/chromebot.json`` and supports the following commands:

a <url> -j <concurrency> -r <policy>
Archive <url> with <concurrency> processes according to recursion <policy>
s <uuid>
Get job status for <uuid>
r <uuid>
Revoke or abort running job with <uuid>

0 comments on commit ec5e6a0

Please sign in to comment.