Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Architecture: Use multiple cores to run link archiving in parallel #91

Open
pirate opened this issue Aug 30, 2018 · 15 comments
Open

Architecture: Use multiple cores to run link archiving in parallel #91

pirate opened this issue Aug 30, 2018 · 15 comments
Labels
size: hard status: backlog Work is planned someday but is not the highest priority at the moment touches: configuration touches: data/schema/architecture why: performance Intended to improve ArchiveBox speed or responsiveness
Milestone

Comments

@pirate
Copy link
Member

pirate commented Aug 30, 2018

Add a --parallel=8 cli option to enable using multiprocessing to download a large number of links in parallel. Default to number of cores on machine, allow --parallel=1 to override it to 1 core.

@pirate pirate added size: hard status: idea-phase Work is tentatively approved and is being planned / laid out, but is not ready to be implemented yet why: functionality Intended to improve ArchiveBox functionality or features labels Aug 30, 2018
@pirate
Copy link
Member Author

pirate commented Aug 30, 2018

Inspired by https://github.com/aurelg/linkbak I've had this on my mind for a while since it's super easy to implement, but @aurelg inspired me to actually make an issue for it.

@aurelg
Copy link

aurelg commented Aug 31, 2018

The relevant code is in this file:

    nb_workers = args.j if args.j else os.cpu_count()
    get_logger().warning("Using %s workers", nb_workers)

    if nb_workers > 1:
        with contextlib.closing(multiprocessing.Pool(nb_workers)) as pool:
            pool.starmap(start_link_handler,
                         [(l, args) for l in get_links(args.file[0])])
    else:
        for link in get_links(args.file[0]):
            start_link_handler(link, args)

This is actually pretty easy. In your case, the only difficulty might be to handle the screen output/progression bar properly: if different workers are updating the screen at the same time, it may quickly become a bit messy.

@pirate pirate removed the size: hard label Sep 2, 2018
@pirate pirate added size: medium status: wip Work is in-progress / has already been partially completed touches: data/schema/architecture and removed why: functionality Intended to improve ArchiveBox functionality or features status: idea-phase Work is tentatively approved and is being planned / laid out, but is not ready to be implemented yet labels Dec 7, 2018
@pirate
Copy link
Member Author

pirate commented Jan 23, 2019

I think I can fix the parallel process stdout multiplexing problem with one of two solutions:

  • a program like curtsies two show each stream in a grid, tail -f style
  • having each subprocess >> /logfile, but obtain a lock for the duration of each link archive, so the output remains in contiguous chunks instead of being mixed together line by line:

some simplified pseudocode:

import fcntl

def archive_links(link):
    link, stdout = run_archive_methods(link)

    with open(f'data/logs/archive.log', 'a') as f:
        fcntl.flock(f, fcntl.LOCK_EX)
        f.write(stdout)
        fcntl.flock(f, fcntl.LOCK_UN)

if parallel > 1:
    pool = subprocess.Pool(count=parallel)
    pool_status = pool.map_async(bar, links)
    last_pos = 0
    while not pool_status.ready():
        with open(f'data/logs/archive.log', 'r') as f:
            f.seek(last_pos)
            print(f.read())
            f.seek(0, os.SEEK_END)
            last_pos = f.tell()
        sys.stdout.flush()
        mr.wait(0.2)
else:
    for link in links:
        link, stdout = run_archive_methods(link)
        print(stdout)

@anarcat
Copy link

anarcat commented May 6, 2019

parallel downloading is tricky - you need to be nice to remote hosts and not hit them too much. there's a per-host limit specified in RFCs which most web browsers override. most clients do 7 or 10 requests per domain, IIRC.

but since you're crawling different URLs, you have a more complicated problem to deal with and can fetch more than 7-10 simultaneous queries globally... you need to be careful because it requires coordination among the workers as well, or have a director that does the right thing. in my feed reader, I believe I just throw about that number of threads (with multiprocessing.Pool and pool.apply_async, see the short design discussion) at all sites and hope for the best, but that's clearly inefficient.

I came here because I was looking at a bug report about running multiple archivebox add in parallel: I think this is broken right now (see #234 for an example failure) because the index.json gets corrupted or stolen between processes. it would be nice to have at least a lock file to prevent such problems from happening.

@pirate pirate changed the title Enable downloading of links in parallel Architecture: Use multiple cores to run link archiving in parallel May 6, 2019
@LaserWires
Copy link

LaserWires commented Sep 21, 2019

parallel downloading is tricky - you need to be nice to remote hosts and not hit them too much. there's a per-host limit specified in RFCs which most web browsers override. most clients do 7 or 10 requests per domain, IIRC.

but since you're crawling different URLs, you have a more complicated problem to deal with and can fetch more than 7-10 simultaneous queries globally... you need to be careful because it requires coordination among the workers as well, or have a director that does the right thing. in my feed reader, I believe I just throw about that number of threads (with multiprocessing.Pool and pool.apply_async, see the short design discussion) at all sites and hope for the best, but that's clearly inefficient.

I came here because I was looking at a bug report about running multiple archivebox add in parallel: I think this is broken right now (see #234 for an example failure) because the index.json gets corrupted or stolen between processes. it would be nice to have at least a lock file to prevent such problems from happening.

Most site mirroring apps incorporate download by proxy it is a very common feature which might be implemented into archivebox hence a large --parallel value is of no issue with such a feature. A list list of proxies can enable an archive operation to use a very large --parallel value. Most webservers currently are very adequately suited with high bandwidth telecoms along with very capable hardware and users with archivebox overloading them isn't likely an issue.

@pirate
Copy link
Member Author

pirate commented Sep 22, 2019

It's not an issue of overloading archivebox, it's an issue of overloading / hitting rate-limits on the content servers, which piping through a proxy wont solve.

@karlicoss
Copy link
Contributor

Even more cores than 8 might make sense, because often things are blocked on IO with no throughput, e.g. pages that would timeout. Might need some careful scheduling, but would be very cool to have!
Another IMO useful thing is having some sort of "pipeline" concurrency, e.g. one executor only archives DOM and always runs in front. The other executors run behind and handle singlepage/screenshots/media/etc, i.e. slower, but not as essential bits. This might also make it easier to schedule the load depending on which archivers are IO/CPU bound.

@pirate
Copy link
Member Author

pirate commented Dec 10, 2020

A quick update for everyone watching this, v0.5.0 is going to be released soon with improvements to how ArchiveResults are stored (we moved them into the SqliteDB). This was a necessary blocker to fix before we can get around to parallel archiving in the next version.

v0.5.0 will be faster, but it wont have built-in concurrent archiving support yet, that will be the primary focus for v0.6.0. The plan is to add a background task queue handler like dramatiq or more likely huey (because it has sqlite3 support so we don't need to run redis).

Once we have the background task worker system in place, we can implement a worker pool for Chrome/playwright and each of the other extractor methods. Then archiving can run in parallel by default, archiving like 5-10 sites at a time depending on the system resources available and how well the worker pool system performs for each extractor type. Huey and dramatic both have built-in rate limiting systems that will allow us to cap the number of concurrent requests going to each site or being handled by each extractor. It's still quite a bit of work left, but we're getting closer!

Having a background task system will also enable us to do many other cool things, like building the scheduled import system into the UI #578, using a single shared chrome process instead of relaunching chrome for each link, and many other small improvements to performance.

@pirate
Copy link
Member Author

pirate commented Apr 12, 2021

With v0.6 released now we've taken another step towards the goal of using a message-passing architecture to fully support parallel archiving. v0.6 moves that last bit of ArchiveResult state into the SQLite3 db where it can be managed with migrations and kept ACID compliant.

The next step of the process is to implement a worker queue for DB writes, and have all writes made to Snapshot/ArchiveResult models processed in a single thread, opening up other threads to be able to do things in parallel without locking the db anymore. Message passing is a big change though, so expect it to come in increments, with about 3~6 months of work to go depending on how much free time I have for ArchiveBox.

Side note: the UX of v0.6 is >10x faster in many other ways though (web UI, indexing, management tasks, etc.), only archiving itself remains to be sped up now. You can also still attempt to run arhcivebox add commands in parallel, it's safe and works to speed up archiving a lot already, but you may encounter occasional database locked warnings that mean you have to restart stuck additions manually.

@runkaiz
Copy link

runkaiz commented May 3, 2021

Sorry quick question, so I run archivebox in a docker container and currently would allocating it more than one CPU core or thread have any performance gains?

@pirate
Copy link
Member Author

pirate commented May 7, 2021

@1105420698, allocating more than 1 cpu is definitely still advised, as django will use all available cores to handle incoming requests in parallel, and a few of the extractors already take advantage of multiple cores to render pages faster (e.g. chrome).

ArchiveBox is already fairly multicore-capable (e.g. you can run multiple add or update threads at the same time), it's just a few remaining edge cases and highly-parallel write scenarios that will be improved by the pending message queue refactoring work.

@pirate pirate added size: hard status: backlog Work is planned someday but is not the highest priority at the moment touches: configuration why: performance Intended to improve ArchiveBox speed or responsiveness and removed size: medium status: wip Work is in-progress / has already been partially completed labels May 7, 2021
@pirate
Copy link
Member Author

pirate commented Jun 30, 2021

Ok I'm pretty set on using Huey at this point for the job scheduler, it can use SQLite, it comes with a great django admin dashboard, and it supports nested tasks and mutexes.

https://github.com/boxine/django-huey-monitor/#screenshots

Here's the approach I'm thinking of to massage all critical operations into a message-passing / queue / worker arrangement in rough pseudocode:

  1. archivebox add --depth=1 'https://example.com/feed.xml' leads to these tasks being triggered and handled in this order ->
# global mutexes that workers will use to limit number of concurrent operations at a time, when they use a resource that is constrained (e.g. disk IO, network, CPU, etc.)

GLOBAL_FS_ATOMIC_MUTEX = Pool('fs:atomic', timeout=120, max_concurrent=3)                            # increase these on fast SSDs, decrease it on slow spinny drives
GLOBAL_FS_CHOWN_MUTEX = Pool('fs:chown', timeout=120, max_concurrent=3)

GLOBAL_DB_WRITE_MUTEX = Pool('db:write', timeout=120, max_concurrent=1)                              # raise this if your db can handle concurrent writes at all (i.e. not sqlite)

GLOBAL_DB_TABLE_SNAPSHOT_MUTEX = Pool('db_snapshot:write', timeout=360, max_concurrent=1)            # raise these if your db can handle concurrent writes to the same table
GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX = Pool('db_archiveresult:write', timeout=360, max_concurrent=1)

DB_ROW_SNAPSHOT_MUTEX = lambda url: Pool(f'db_snapshot:write:{url}', timeout=360, max_concurrent=1)  # raise these if your db can handle concurrent writes to the same row (probably a bad idea)
DB_ROW_ARCHIVERESULT_MUTEX = lambda url: Pool(f'db_archiveresult:write:{url}', timeout=360, max_concurrent=1)

GLOBAL_SNAPSHOT_PULL_MUTEX = Pool('snapshot:pull', timeout=800000, max_concurrent=4)                 # raise this if you want to snapshot more URLs in parallel

GLOBAL_EXTRACTOR_MUTEX = Pool('extractor:run', timeout=234234234, max_concurrent=12)                 # only allow 12 extractors to run at a time globally

GLOBAL_PER_DEPENDENCY_MUTEX = lambda dependency: Pool('extractor:run_dependency:{dependency}', timeout=234234234, max_concurrent=4)  # only allow 4 of each type of extractor dependency to run at a time

PER_DOMAIN_RATELIMIT_MUTEX = lambda domain: Pool('domain:pull:{url}', timeout=3600, max_concurrent=3, rate_limit=sliding_window(...))  # raise this if the domain you're archiving can handle lots of concurrent requests

GLOBAL_SHELL_CMD_MUTEX = Pool('system:shell_cmd', timeout=234234234, max_concurrent=8)                 # maximum number of shell cmds/external binaries to execute at once


await CLI.AchiveboxAdd(urls_text, depth=1) {
    await parsers.Parse(urls_text, depth) { ... }
    
    await models.Snapshot.bulk_update_snapshots(snapshots) {
    	await parallel([
            models.Snapshot.update_or_create(snapshot1, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX(snapshot1.url)]),
            models.Snapshot.update_or_create(snapshot2, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot2.url)),
            models.Snapshot.update_or_create(snapshot3, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX], DB_ROW_SNAPSHOT_MUTEX(snapshot3.url)),
        ])
    }
    
    await index.update_main_index(snapshots) {
        await parallel(max=4, [
            models.Snapshot.write(snapshot1) {
                await await models.Snapshot.update_or_create(snapshot, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_SNAPSHOT_MUTEX, DB_ROW_SNAPSHOT_MUTEX])
                await models.Snapshot.update_filesystem(snapshot1) {
                    await parallel([
                        system.atomic_write('index.json', acquire=[GLOBAL_FS_ATOMIC_MUTEX]),
                        system.atomic_write('index.html', acquire=[GLOBAL_FS_ATOMIC_MUTEX]),
                    ])
                }
            },
            models.Snapshot.write(snapshot2) {
                ...
            },
            ...
        ])
    }

           
    await extractors.save_snapshots(snapshots) {
    
      await extractors.pull_snapshot(snapshot1, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) {
                
         await parallel(max_concurrency=2, [
            
			extractors.extract(snapshot1, 'wget', acquire=[PER_DOMAIN_RATELIMIT_MUTEX(snapshot1.domain), GLOBAL_EXTRACTOR_MUTEX]) {
                await extractors.run_dependency('wget', acquire=[GLOBAL_PER_DEPENDENCY_MUTEX('wget')]) {
                    system.run_shell_cmd('wget ...', acquire=[GLOBAL_SHELL_CMD_MUTEX])
                    system.chown(result.output_path, acquire=[GLOBAL_FS_CHOWN_MUTEX])
                    system.dir_size(result.output_path)
                }

                await models.ArchiveResult.write(snapshot, result) {
                    models.ArchiveResult.get_or_create(snapshot, result, acquire=[GLOBAL_DB_WRITE_MUTEX, GLOBAL_DB_TABLE_ARCHIVERESULT_MUTEX])
                    models.Snapshot.write(snapshot, merge_results=results)
                        ...
                    index.add_index_texts(result=result) &
                }
            },
                    
                
            extractors.extract(snapshot1, extractor='git', acquire=[...]) {
                extractors.run_dependency('git')
                    system.run_shell_cmd('git ...')
                    system.chown(result.output_path)
                    system.dir_size(result.output_path)
                    
                models.ArchiveResult.write(snapshot, result)
                    ...
            },
                        
            extractors.extract(snapshot1, extractor='playwright_screenshot', acquire=[...]) {
                extractors.run_dependency('playwright')
                    context.assert_on_url(snapshot1.url)
                    context.assert_loaded()
                    context.run_js(snapshot1, 'screenshot.js')
                    system.chown(result.output_path)
                    system.dir_size(result.output_path)
                 
                models.ArchiveResult.write(snapshot, result)
                    ...
			
			},
			
	    ])
			
            
        extractors.pull_snapshot(snapshot2, acquire=[GLOBAL_SNAPSHOT_PULL_MUTEX]) &
        
            extractor_pool = POOL(max_concurrency=2, key='extractors.extract({snapshot.url})') 
        
            models.Snapshot.write(snapshot2)
                ...
            extractors.extract(snapshot2, extractor='wget')
                ...
            extractors.extract(snapshot1, extractor='git')
               ...
            ...
        ...
      

WIP ignore this, just toying around with different patterns / styles to find something with good ergonomics:

run({task: 'cli.archivebox_add', urls: 'https://example.com', depth: 1})

def add(urls: str='', depth=0, extractors='all', parent_task: str='main.add'):
    parsed_urls = await call({task: 'parsers.parse', urls: 'https://example.com', parent_task: parent_task})`
    snapshots = []
    
    # create Snapshots and update/create the snapshot indexes <timestamp>/index.{json,html}
    for url in parsed_urls:
        snapshot, created = await call({task: 'Snapshot.get_or_create', url='https://example.com'})
		call({task: 'index.update_snapshot_index, snapshot: snapshot})`
        snapshots.append(snapshot)
    
    # ru the
    for snapshot in snapshots:
        schedule({task: 'extractors.run_extractors', snapshot: snapshot, extractors: 'all', overwrite: false}
        

I'm worried that the heavy reliance on mutexes and locking will lead to difficult-to-debug deadlock scenarios where parents span children that eat up all the worker slots, then are unable to complete, leading to the parent to timeout and force kill those workers prematurely.


I also reached out to the folks who are building django-huey-monitor as it looks like a great fit for our job handling UI: boxine/django-huey-monitor#40

@jgoerzen
Copy link

jgoerzen commented Jul 5, 2021

Over in #781 it was stated that parallel adds don't work yet. Over at https://github.com/ArchiveBox/ArchiveBox/wiki/Usage#large-archives there is an example of doing this that should probably be removed until this is fixed.

@pirate
Copy link
Member Author

pirate commented Jul 6, 2021

It works better in some cases (fast SSDs) than others so it's still worth trying, shouldn't be dangerous to data integrity, it'll just lock up if it's on a slow filesystem. I added a note to the Usage page.

@pirate
Copy link
Member Author

pirate commented Apr 12, 2022

Note I've added a new DB/filesystem troubleshooting area to the wiki that may help people arriving here from Google: https://github.com/ArchiveBox/ArchiveBox/wiki/Upgrading-or-Merging-Archives#database-troubleshooting

Contributions/suggestions welcome there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size: hard status: backlog Work is planned someday but is not the highest priority at the moment touches: configuration touches: data/schema/architecture why: performance Intended to improve ArchiveBox speed or responsiveness
Projects
None yet
Development

No branches or pull requests

7 participants