Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: starting Python and importing Django in many gearman workers is inefficient and unnecessary #938

Open
jrwdunham opened this issue Feb 22, 2018 · 8 comments

Comments

Projects
None yet
8 participants
@jrwdunham
Copy link
Contributor

commented Feb 22, 2018

Profiling of Archivematica's MCPClient indicates that it takes roughly 1.25 seconds for a (Python-based) client script to be started in a new Python process and for it to import Django (and other heavy modules) and create a database connection. Given a package creation event with very many files each requiring several Pythonic client script process forks, this can result in significant inefficiencies.

In many cases this "cold startup" cost is unnecessary when the following could be done instead:

  1. Python client scripts could be directly (dynamically) imported into the long-running Python (MCP Client) Gearman worker process instead of forking new Python processes for the short-lived client script.

  2. If the client script must run in a separate process, then we could avoid the Django import and db connection by having the long-running MCPClient process provide to the client scripts the state (e.g., database data, Django settings, unit variables) that they need. Once done, the client script could pass back (e.g., as JSON printed to stdout) directives for the MCPClient process to mutate the database appropriately.

For evidence of the problem, see the following figure and the AM acceptance tests issue 62:
normalize-flame-graph-analysis jpg 001


Dependencies:

@marktriggs

This comment has been minimized.

Copy link
Contributor

commented Mar 7, 2018

This issue has morphed into looking at end-to-end performance of transfers and ingests, with an eye to making the improvements Joel describes above. We've had a lot of discussion via email, and @jhsimpson suggested moving the discussion here, so I'm going to try to give a potted history.

The crux of the proposal above was to switch MCP Client scripts from forking processes to loading them into a long-running Python instance where possible. To check the feasibility of the approach, we (@payten, @jambun, @marktriggs) put together a quick proof-of-concept (https://github.com/hudmol/archivematica/tree/inline-mcp-client) that used runpy (https://docs.python.org/2/library/runpy.html) to load the client scripts, performing a bit of mangling along the way to deal with command-line arguments and capturing stdout/stderr.

Testing this with a 500 file transfer produced a surprising result: CPU time reduced in line with Joel's analysis above, but the overall duration of the transfer and ingest (in wall clock time) didn't budge. The CPU did less work in the same amount of time.

I put together some numbers and a screencast to show what I think is happening:

You can see my benchmark numbers here (500 JPEG files at 4kb each):

https://docs.google.com/spreadsheets/d/1dgfuaUiNjzO2UkCPjFbnNbwyQ9_MFOBtYXFFLbJN-FU/edit?usp=sharing

and since the results are pretty counter-intuitive (at least, they were to me), I've recorded a brief, handwaving, screencast to show what I think is going on:

http://tsp.nz/d/3aa53338ce2eb967f97f2f47790d3f098dd40181.mp4#/archivematica.mp4

I think the upshot of all of this is that we need to look at how the MCP Server is farming out its work: creating one Gearman job per file might actually be the limiting factor in how quickly we can push them through. If we could improve that then I think we'd see big wins from working on MCP Client too.

This led to more investigation of MCP Server--specifically, trying to find what was causing the MCP Client tasks to start 10s of milliseconds apart, when they could have run concurrently. Eventually that led to the spot where tasks are inserted into MySQL:

https://github.com/artefactual/archivematica/blob/stable/1.7.x/src/MCPServer/lib/linkTaskManagerFiles.py#L127

and we discovered that those inserts take about 140ms on average (historical revisionism note: in my email originally said 10ms, but we now know it's worse than that). Changing the code to group those inserts into transactions is an easy fix and improves insert performance significantly.

What we've found is that changing MCP Client to run its scripts inline can improve performance significantly, but only if you make the corresponding change to MCP Server to batch those insert statements into transactions. Either change in isolation has no effect, but apply both and you get a good performance boost (a 40% reduction in duration in my test).

With all of that, I think we've validated performance benefits of further improving the MCP Client task handling. Discussing within the group, we generally agreed that using runpy and the associated collection of hacks necessary to capture stdout/stderr, exit codes, etc. were Not Ideal. So, we propose that we start looking at defining an interface that the MCP Client scripts could ultimately be converted to use.

Our sense is that the ideal API for these scripts would:

  • have some structured way of passing arguments (avoiding serialising/deserialising)

  • have some way for the script to yield both an exit code and a more descriptive error status (retaining compatibility with the current approach while leaving room for improved error reporting later)

  • have streams for stdout/stderr for the script to send its output

  • (possibly?) have a way to provide a stdin stream (if such a thing is used anywhere or deemed useful...)

  • support some provision for batching--for example, the option of running the task against N inputs in a single shot and amortising the cost of overheads to improve throughput

@marktriggs

This comment has been minimized.

Copy link
Contributor

commented Mar 13, 2018

Just a quick bit of additional information from some small benchmarks I ran locally. As before, running a transfer and ingest of 500 JPEG images.

  • running with the vanilla qa/1.x branch as of today, the whole thing takes 40 minutes

  • running vanilla qa/1.x but with fsync turned off in MySQL: 25 minutes

  • running with the inline-mcp-client branch (runpy hackery, rebased against qa/1.x): 22 minutes

  • running with the same inline-mcp-client branch and with fsync turned off in MySQL: 8 minutes

The inline-mcp-client branch is here: https://github.com/hudmol/archivematica/tree/inline-mcp-client, but is basically what we talked about before--using runpy to execute the MCP Client scripts in the current process instead of forking.

The fsync change runs mysqld using http://www.mcgill.org.za/stuff/software/nosync, which stops any COMMIT from calling fsync (which would hit the disk). Durability goes out the window, but it helps give a sense of how much time is spent just waiting for MySQL to handle a COMMIT (which is after every DB statement by default).

So roughly speaking, it seems like my 500 file ingest running qa/1.x is spending ~15 minutes waiting for MySQL commits, ~15 minutes reloading python/django/etc., ~10 minutes doing everything else. At least this suggests that future work on improving the MCP Client interface is likely to have a good payoff.

@sevein

This comment has been minimized.

Copy link
Member

commented Mar 13, 2018

Should we have a maximum batch size? E.g. say you're processing 500 video files and we only write the tasks at the very end then the dashboard isn't going to be able to report any status to the user until it's 100% complete.

I wish it was easier to make the inserts concurrently. Launching a thread only for that purpose is probably too expensive (in particular in that scenario where you have small files). Maybe having a queue shared with another thread responsible of doing the inserts, would that be better? There is a nice read from the creator of sqlalchemy (http://techspot.zzzeek.org/2015/02/15/asynchronous-python-and-databases/) that talks about asyncio - something I found surprising when I read that is that the queries are not IO bound at all (well maybe they are with autocommit?). I guess this means that on a threaded solution the GIL would become a bottleneck, I'm guessing. Although you can also share a queue between two processes (https://docs.python.org/2/library/multiprocessing.html), perhaps that could do it.

@marktriggs

This comment has been minimized.

Copy link
Contributor

commented Mar 13, 2018

One stray thought on the dashboard: I guess it would be possible to have it set its transaction isolation to READ_UNCOMMITTED when polling for task statuses. That way, it would see the latest inserts even if the batch hadn't been committed yet. Batching probably is a good idea too, but just thought I'd note the idea :)

@marktriggs

This comment has been minimized.

Copy link
Contributor

commented Mar 20, 2018

Hi all,

We've put together a summary of the analysis we've done so far an a proposal for next steps. Looking forward to chatting about it all soon!

Cheers,
James, Payten, Mark


Overview of current architecture

  • MCP Server figures out what to do and submits tasks via Gearman. Tasks can originate from watched directories, from the Dashboard, or can follow on from other tasks.

    Tasks operate at varying levels of granularity: sometimes at the level of the SIP/Transfer, often at the level of individual files. Each task submitted to Gearman is managed by an MCP Server thread that submits the task and waits for results. There is a global limit of how many tasks can run at once (75 by default).

  • MCP Client runs N worker threads, which read tasks from Gearman and fork the corresponding client script. When the client script finishes, its stdout/stderr and exit code are returned to Gearman, and finally logged to the database by the MCP Server thread.

Performance Analysis

The analysis performed so far (https://github.com/artefactual/archivematica/issues/938#issuecomment-372508380) suggests that the current architecture adds overhead in two main areas:

  • Interaction with the database is quite chatty: the MCP Server inserts one row per task, the MCP Client updates that row as each task is received, the MCP Client scripts may query and update the database, and the MCP Server updates its row to record task output. These interactions are repeated for each task.

    By default, each database update runs in its own transaction, with a commit after each statement. In local testing, these commits add an average overhead of 140ms to each update. To reduce this overhead, it is desirable to run as many updates as possible within a single transaction, but the "one task per file" aspect of the current architecture makes this difficult.

    Additionally, query performance is optimised when data is fetched from the database in batches. For example, fetching the FPR Rules for an entire file set would be more efficient than fetching one rule per file. Again, the current architecture precludes this because each task is processed with its own database connection, in isolation from other tasks.

  • MCP Client scripts are continually reloaded, with each one re-establishing a connection to the database and loading supporting libraries such as Django. The original analysis in https://github.com/artefactual/archivematica/issues/938 shows that almost half the time of certain tasks is consumed by this.

Illustrating these overheads, we ran an experiment where we ingested 500 very small image files. First, we used a vanilla version of the current code running under a Docker environment. End-to-end, this took 40 minutes.

Next, we modified the test environment in two ways:

  • we modified MySQL to avoid flushing to disk (simulating a reduction in database commit overhead by essentially making commits a no-op)

  • we modified the MCP Client code to run client scripts inline instead of forking them (avoiding the repeated loading of Python/django/etc.).

Under this modified environment, the same ingest ran in 8 minutes. The two modifications contributed equally to this improvement in duration.

So, when dealing with small files, about 80% of the total time was spent on overhead: either waiting for the database to write to disk, or waiting for the MCP Client scripts to load and bootstrap.

To achieve something like this degree of improvement, it would be necessary to change the MCP Server and MCP Client programs to enable batch processing of files. When a client script is handed a single file to work on, it doesn't have much leeway to optimise its activities; but give it 1,000 files and it has much more flexibility: it can group its database reads and writes and amortise overheads across the entire batch.

Proposal

To move to a system where MCP Client scripts operate on batches of files, the immediate issue is to place responsibility for defining the batches. We propose that the MCP Server is responsible for this, creating batches of files needing processing based on rough heuristics (e.g. based on types of tasks, number of MCP Client worker machines available, and so on).

The goal of batching at the MCP Server level is to ensure that work can be effectively split to run across multiple machines in parallel. However, the most effective split will depend somewhat on the amount of work (in CPU time) there is to do.

For example, if an Archivematica installation running with three MCP Client machines has a group of nine videos to transcode, it makes sense to divide the work into three equally sized sets and share it across all three machines--with a fair amount of work to do on each file, the gains from concurrency outweigh the cost of coordination. However, if the task was to produce thumbnail versions of images, splitting the work to run on multiple machines might not really pay off until you had a few hundred images to process. In short, different types of work will have different ideal batch sizes.

Beyond the MCP Server breaking work into batches, our opinion is that the MCP Client scripts are best-placed to make processing decisions to optimise performance. The nature of the work performed by the MCP Client scripts is inherently heterogeneous and domain-specific, varying widely in duration and CPU usage. As such, it is difficult to make optimal resource allocation decisions without having knowledge of the exact nature of work being performed by a client script.

For example, the ffmpeg program automatically runs as many threads as the system has CPUs, so running concurrent ffmpeg processes doesn't improve throughput. Conversely, a program like ImageMagick runs in a single thread, and running multiple instances gives a linear increase in throughput. The client script that runs a given tool is in the best position to decide how to utilise system resources.

This leads to a departure from the current MCP Client, which runs multiple client scripts simultaneously. We propose instead that each MCP Client instance should run a single client script at a time, with that script taking responsibility for running its own threads in whatever way is optimal. This utilises CPUs more effectively--avoiding the situation where multiple "heavy hitting" client scripts try to run on every CPU, and also gives client scripts the freedom to batch their database accesses to reduce overhead.

New MCP Client script interface

Existing Python client scripts will need to be converted to standalone modules, where each presents a single, standardised entry point:

call([jobs])

Each Job object is an atomic unit of work to be performed--for example, a file to be transcoded. It will provide:

  • job.args -- an array containing what would have been sys.argv, but doesn't need to be limited to strings.

  • job.set_status(int, code='some default') -- return an exit status and an optional status enumeration.

  • job.write_output(s) -- log standard output (can be called multiple times).

  • job.write_error(s) -- log standard error (can be called multiple times).

Migration strategy for existing client scripts

  • Current scripts call sys.argv for parameters, sys.exit/exit/quit to yield a status.

  • Steps to migrate an existing script might be:

    • Wrap the whole thing in a class

    • Implement the call([jobs]) method

    • Wrap the body of the script in 'for job in jobs:'

    • Replace usages of sys.argv with job.args

    • Replace print "foo" with job.write_output("foo\n") (thinkme: newlines!)

    • Replace sys.exit/exit/quit with job.set_status(int)

      • This is tricky because exit/quit aborts execution and our call won't. Will need to use the appropriate return/break/whatever.

MCP Server changes

Figuring out the batches

  • First cut: set a global batch size to something reasonable--100 or something.

  • Next: have a task-based suggested batch size that overrides the global default and is configurable on a per-site basis (which may be influenced by the number of clients running)

  • We should find a way of getting rid of all the threads--have an event loop that keeps track of which gearman jobs we're waiting on and polls.

MCP Client changes

  • Now single-threaded. Just reads tasks off the gearman queue one at a time and invokes the right client script.

  • Gathers up the results once jobs have finished and saves them to the database.

  • Returns the exit status for each job to Gearman.

Dashboard changes

  • The dashboard currently polls the database for status updates on individual tasks. Having it open a dedicated READ_UNCOMMITTED database session should allow it to continue to do this, even though those task statuses may not have yet been committed as a part of their batch.

FPR command changes?

Joel noted that FPR scripts might be a candidate for consideration too:
https://github.com/artefactual/archivematica/pull/995#discussion_r175556194.

@joel-simpson

This comment has been minimized.

Copy link

commented Mar 26, 2018

This proposal looks great. I won’t pretend to understand all of the technical details around threading and approach for batching etc so will defer to others on that. But the general direction makes a lot of sense to me.

I’d like to mention two benefits that I think this design could deliver, beyond the primary objective to improve scalability & performance).

  1. Usability benefit of determining FPR rules to run in advance (in a batch). Currently we run tasks that then determine that no FPR rule is required. On top of being inefficient, this creates a fair bit of noise in the results for users to filter through. You have to read stdout to realize that no rule was found and so nothing actually happened.
  2. Standardization of the outcome of jobs/tasks. Exit codes are not defined consistently currently, and are often limited to ‘pass’ or ‘fail’ (e.g. there is no standard for ‘completed with warnings’). I did analysis to identify improvements for error handling for the Jisc project and this was one of the key barriers. Using a standard model or enumeration for task/job status would enable a whole range of improvements in how we aggregate and report preservation metrics in general, and error handling in particular.

A couple of comments on the proposed design:
I completely agree with Joel’s comment that we should have a standard interface for FPR commands. In fact I’d suggest we could have one for FPR rules (so you can say, I have the following files, tell me which rules apply to them) and one for FPR Commands (so that we can start to standardize the arguments & outputs used in commands and abstract out all the variations implemented by all of the different tools that we use).

This design and the idea of standard interface for FPR maps very closely to the conceptual design work we did for the PAR with Jisc (I don't want to complicate this with that proposed project; but I think it’s worth noting this is all a strong step in that direction and I don't see anything here that I think we might regret later if we do take forward the PAR project). It would be great to leverage that design work to agree on good terminology going forward. I’m not sure if the terminology above (e.g. how you are using the word ‘jobs’) maps accurately to current MCP server schema. But I find that schema quite complex, so I think another general benefit of implementing a standard interface is that we can base it on a good domain model ignore some of the complexities of the particular implementation of MCP Server.

Final point on the dashboard: If I understand the proposed change there, you’d be doing this so that you get more responsive / timely feedback in the GUI? So long as the dashboard continues to clearly show when is ‘executing’ (doing something) versus when it is completed or awaiting user input, I don’t think increasing responsiveness on individual tasks is necessary. I don’t see that much benefit to users… indeed I wonder if there would be any, given that the dashboard currently only reports at the ‘job’ level (so a batch of ‘tasks’ for format identification, say)?

@sromkey

This comment has been minimized.

Copy link
Contributor

commented Jun 29, 2018

sevein added a commit that referenced this issue Jul 5, 2018

MCP task batching
Rework the MCP Server, MCP Client and MCP Client scripts to support batching tasks.

* The MCP Server now batches file-level tasks into fixed-size
    groups, creating one Gearman task per batch, rather than one per
    file.  It also uses fixed-size thread pools to limit contention
    between threads.

* The MCP Client now operates in batches, processing one batch at a
    time.  It also supports running tasks using a pool of
    processes (improving throughput where tasks benefit from spanning
    multiple CPUs.)

* The MCP Client scripts now accept a batch of jobs and process them
    as a single unit.  There is a new `Job` API that provides a
    standard interface for these client scripts, and all scripts have
    been converted to use this.

Much more documented in issue - #938.

---

Squashed commits:

* Fix linting issues

* Fixes in exception logging

* Remove unused settings

* Add missing parameter

* Don't use __file__ on fork_runner because it gives us back the pyc path!

* Reduce the transaction scope of the email fail report

* Move transaction around clamscan jobs to only cover the creation of events generated by the jobs

* Improve logging of failures in modules running under fork_runner

* Remove unused setting that came back after rebasing

* Rework test_antivirus.py after archivespace_clamscan script changes

@sevein sevein self-assigned this Jul 5, 2018

sevein added a commit that referenced this issue Jul 27, 2018

MCPClient: standardise global state in create_mets_v2
The `create_mets_v2` client script uses a bunch of global state for
things like counters and file IDs.  The original implementation could
rely on these globals being reset between runs (by virtue of running
the script in a new subprocess each time) but, after the batching
changes, this is no longer the case.

To make things more manageable, we pull all of this global stuff into
a single top-level object which gets replaced at the beginning of each
job run.  Where other modules call the functions of create_mets_v2,
they reset the global state as appropriate as well.

A secondary piece of shared state is the
`sharedVariablesAcrossModules` module, used by `create_mets_v2` and a
handful of other scripts.  Handling for this needed to be updated for
the batching changes in a small number of places.

This is connected to archivematica/Issues#42.
This is connected to archivematica/Issues#43.
This is connected to #938.

@sallain sallain self-assigned this Oct 26, 2018

@sallain

This comment has been minimized.

Copy link
Member

commented Oct 29, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.