Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command for checking if job is still in queue #81

Closed
jonhoo opened this issue Nov 8, 2017 · 15 comments
Closed

Add command for checking if job is still in queue #81

jonhoo opened this issue Nov 8, 2017 · 15 comments

Comments

@jonhoo
Copy link
Collaborator

jonhoo commented Nov 8, 2017

Polling is a relatively common way of checking for job completion, but Faktory does not currently provide a way for applications to do that. To support this, it'd be useful to have an API call that checks whether a given job is still enqueued (i.e., has not been processed yet). Something like:

ENQUEUED {"queue": "<queue>", "jid": "<jid>"}

Which returns one of

+gone
+queued
+dead

queue should default to default if not given. We need the queue to be explicitly named to avoid searing all the queues and sets.

For further motivation and discussion, see the Gitter thread starting from https://gitter.im/contribsys/faktory?at=5a03413f7081b66876c7a6ae.

@andrewstucki
Copy link

Hmm... I think that in order to support this (and to do so efficiently) it would require metadata for every job twice.

The problem is that the jid is part of the key mapping for all non-strict-time-based storage, i.e. jobs in the "working" or "failed" state, etc (anything that uses rocksSortedSet...). However, in order to support prioritization, fast access, and linearly dequeuing all of the jobs the keys used in the actual job queue are all sequences.

The only thing that I can think of that would allow you to do this would be creating a secondary column family in rocks to store job "states" and then do some sort of synchronized update between that column, the queue, and all the other areas we store job info.

That being said, I'm not quite sure what the point of storing state like this is in a work queue. I would imagine you would have much better control doing some sort of completion callback in your client itself--which is a pretty common pattern that I've used working with both sidekiq and systems like kafka. Otherwise, if you're looking for a mechanism to debug stuff or monitor the system, couldn't you just use the web ui?

@jonhoo
Copy link
Collaborator Author

jonhoo commented Nov 8, 2017

I'm not sure what you mean by "storing state"? I'm not proposing that the system should store any more state than it already does. Rather, there should be a way to find out whether a given job is in a queue (i.e., has not yet been completed by a worker), or not. Callbacks are a pain, especially for simpler applications, because they require another messaging fabric to maintain information that the work server already has.

@andrewstucki
Copy link

What I mean is that, as it is, to check to see if something with a jid is in the queue, you need to iterate over every key in the rocks store, deserialize the job, and see if it has the same jid--things get trickier because you'll probably want to have some level of concurrency control so that you aren't popping data during your scan. With that in mind, you'd have to introduce transaction controls in the queue.

In other words--in order to be efficient about the process and not slow down the rest of the system, you'd likely need to have a secondary data store that allows you to quickly retrieve jobs that are currently enqueued by jid.

I guess I still don't quite get your callback point. Why not just enqueue another job as a callback to process asynchronously? It seems a lot easier than scheduling another thread in your application that scans a stored set of jids periodically and possibly has to coordinate with other application instances.

@mperham
Copy link
Collaborator

mperham commented Nov 8, 2017

Callbacks would likely be implemented as with Sidekiq Pro's Batches: execute this other job when a group of jobs are done. Monitoring a single job can be done as a batch of 1 but may seem odd.

@jonhoo
Copy link
Collaborator Author

jonhoo commented Nov 9, 2017

@andrewstucki Consider the case where I have a UI for uploading and transcoding videos. The user uploads a video, and the UI should show when the video finished transcoding. The transcoding job is sent to Faktory. A trivial way to implement this is to have the client poll for when the job is no longer in the queues. Doing this with callbacks is significantly more complicated. You could do it by introducing another storage system where I record that the transcoding is finished, and then have the client poll that instead, but that introduces some real complexity into both the worker and the client that simply isn't necessary.

Your observation about jids being tricky to look up efficiently is a good one, and one that I do not have an answer to. It is essentially the same thing that makes #82 hard to do efficiently. I suspect this is something Faktory will have to deal with sooner or later, but I may be wrong. Including the job's queue should help a little with this (indeed, that was the motivation for including it), but maybe not enough if we expect some queues to be very long.

@andrewstucki
Copy link

@jonhoo Definitely a good example for me to see your point. Well taken.

That said, I do think that the only way of doing this without running into performance issues/complications is to store information that tracks the current state (the things in [] in the state diagram I threw in #82) in another rocks column family. Essentially you could store an entry that had the jid as its key and data about the job's current state and a reference to another column family's key as its value--this way you could get a direct reference to the job's key in the FIFO queue in a single lookup.

The thing that would make this difficult is:

  1. Adding additional writes to a secondary column for every state transition will have an unknown performance impact. Doing the writes in something like another goroutine would probably help, but would increase the code complexity.
  2. You'd have to come up with a good policy and implementation for purging old data from the column that is potentially longer-lived then any of the current storage columns.

Along with this comes the approximate doubling of your storage space because of the need for duplicate entries for every job in the system.


I say this not to rule out the possibility of implementing this, but just to outline the inherent complexities that I see with doing this in a scalable way. I'm sure there are alternatives that I haven't thought about, but just figured I'd give my $0.02. Ultimately I'm not the maintainer of this project though, so it's really not up to me 😃 .

@jonhoo
Copy link
Collaborator Author

jonhoo commented Nov 9, 2017

So, if I understand correctly, the reason this is tricky stems from the fact that we can't do cross-datastructure transactions (like you can in a traditional RDBMS). Well, then we'll need a protocol. Here's a proposal for a protocol that I think would work. I don't know much about the exact datastructures provided by RocksDB, but I believe it should be general enough to fit. This relies on the fact that Faktory does not (and can not) provide exactly-once execution of jobs, and I believe the overhead should be minimal:

Keep two data-structures: a heap of jids, and a map from jid to job info

On PUSH:

  1. Insert job into map.
  2. Insert jid into heap.

On FETCH:

  1. Pull jid off of queue.
  2. Look up and send job info to worker.

On ACK:

  1. Remove job from map.

On FAIL:

  1. Insert jid into heap.

On recovery after failure:

  1. Walk heap, keep track of any found jobs in a local set.
  2. Walk map, push jid of any jobs not in the set found in 1.

Let's see why this works:

  • If we fail during a PUSH (i.e., between 1 & 2), the job is in the map, but not the heap. It is therefore not found in recovery step 1, and thus added in step 2, leading to a consistent state.
  • If we fail during a FETCH (i.e., between 1 & 2), the job is again in the map, but not the heap, and no worker has it. This is equivalent to the PUSH case above.
  • If we fail after a FETCH, but before we hear the ACK/FAIL, the job is in the map, but not the heap, and will be put back by recovery step 2 again. Note that in this case, we may re-execute the job (since it was sent to a worker), but this is okay. When the job eventually completes, our garbage is collected correctly
  • If we fail during recovery (in step 2, which is the only one that modifies state), this means some of the jobs in the map have been added to the heap. These will be found in step 1 of the next recovery, and will not be added again.

An aside about performance: The number from #80 suggest that Faktory can currently do ~10k ops/second. There is just no way we're limited by the client in the push test. On my laptop, two system calls (write command, read response), which is most of what a PUSH is, takes ~1µs, which is 1M ops/s. This is far from the reported 10k/s, suggesting that Faktory already does something that is pretty slow. I don't know exactly what that is though. Faktory also does two system calls for each PUSH, but somehow it spends 100x that time (10k/s = 100µs/op) adding a job to a heap. I believe it should be possible to manipulate datastructures well within 100µs, even with transactional guarantees, so we'll probably want to look into why this is slow. However, if the current state of affairs is considered "good enough", then I think the proposed protocol will also be, as I don't think modifying the second datastructure for some ops should incur much of an incremental overhead.

@andrewstucki
Copy link

Quickly to your first point, and then moving on to your second point:

Point 1

It's not about cross datastructure transactions, you have transactions that can write across multiple column families in Rocks. It's about keeping operations to the underlying Rocks storage fast--the more operations you introduce in a single high-level command, the slower your throughput is going to be, i.e. a PUSH operation would now go from issuing a single Put to the underlying Rocks storage to issuing two Puts, all other operations would add an additional Put and possibly a Get if you need to deserialize the underlying "state" layer before overwriting it--that means you'd go from a single operation in ACK to potentially 3, two operations in Fail to potentially 4, and three operations in FETCH to potentially 5.

Additionally, as I pointed out storage space would double.

You could alleviate a lot of this by doing all of these state look-ups in memory, but I'm not sure how that would work with restarts.

Also, I'm still unsure as to what purging old state would look like.

Point 2

So, I'm not quite sure how you're getting your estimates, but they seem fairly unrealistic IMO.

Not to mention network overhead, there's also

  1. JSON serialization/deserialization overhead
  2. Conditional branching based off of message/queue payload
  3. Queue retrieval
  4. Conversion between Go and C structures
  5. Underlying RocksDB key storage/retrieval
  6. Persistence to the RocksDB WAL

I wrote up a couple of benchmarks to show some closer-to-raw performance without network overhead here. This is what I get.

➜  faktory git:(benchmarks) go test -run=Bench -bench=. ./server && go test -run=Bench -bench=. ./storage
goos: darwin
goarch: amd64
pkg: github.com/contribsys/faktory/server
BenchmarkFetch-8            	   50000	     25513 ns/op
BenchmarkFetchNoReserve-8   	  100000	     16128 ns/op
PASS
ok  	github.com/contribsys/faktory/server	4.126s
goos: darwin
goarch: amd64
pkg: github.com/contribsys/faktory/storage
BenchmarkQueuePerformance-8   	  300000	      4974 ns/op
BenchmarkPush-8               	  300000	      3642 ns/op
BenchmarkPop-8                	  200000	      8209 ns/op
PASS
ok  	github.com/contribsys/faktory/storage	5.407s
➜  faktory git:(benchmarks)

Also from RocksDB's own benchmarking. Seeing a load of 1 billion keys bulk sequential keys with the WAL enabled happen in 36 minutes means that the operation is closer to ~460k writes a second.

Looking at the Faktory benchmark, the write performance is about half that. I'm not sure if that's due to fsync operations, bloom filter configuration, the space of each key/value, or even just the difference in specs between my laptop and the official Rocks benchmark machine. It's probably worth looking into though.

The benchmark speed for Pop I believe is due to the fact that there are actually two Rocks operations being performed, a Get and a Delete. I'm not sure exactly what the official performance benchmarks would look like for these since they're sequential Gets and Deletes, but, in a hand wavy way it makes sense to me that two operations would take about double the time that one would.

Next, looking at the Fetch benchmarks. From the looks of it, the extra "reservation" step (adding a job to the "working" store) takes 10µs, which means that purely iterating over the requested queues, running a Pop operation, and deserializing the job is taking about 16µs--about double what a pure Pop on a known queue with no deserialization takes.

Given that go's internal json encoding library takes about 3µs just to serialize/deserialize simple structures, I'm not surprised by the fact that it that much longer to do a Fetch v a raw Pop.

What all of this tells me is that even serialization aside, raw Push and Pop operations on byte arrays bottleneck the throughput of storing jobs to the 100ks, not millions. The additional serialization overhead and multiple writes we do to move jobs into other persistent stores bottleneck the throughput of Fetch operations to the 10ks. If we take the raw numbers from my laptop and compare them with what I get as my "job processing" load test numbers (~6k/s for "pop") and estimate the "fetch" operation as taking roughly half the time of the benchmark (due to the fact we're doing a FETCH + ACK), then you have a FETCH over the network going at a sequential rate of about ~12k a second, which is about 3 times slower than the fetch benchmark above which isn't run over the network.

Of course there are ways to speed fetches up including:

  1. Use a different serialization layer
  2. Tune Rocks more

But right now from what I can see network roundtrips are taking ~2/3 of the operation time.


I say this all to point out that expectations for Faktory to do millions of operations a second is not realistic. If you look at Redis benchmarks, even doing parallel network requests and doing pure in-memory storage gives around 72k operations a second for a simple SET.

@mperham
Copy link
Collaborator

mperham commented Nov 9, 2017

Sidekiq.cr, an impl of Sidekiq in Crystal (which is a very fast language), can process about 15k jobs/sec with Redis on my laptop. The Ruby version can do about 5k/sec. While I'd love to beat that 15k number, I think it's unrealistic with Faktory's age, right now I'm focused on features and usability, not performance. If you want to process 100,000s of things per second, you should be looking at a Kafka cluster. Background jobs are naturally more "coarse" in my mind than a stream of data: think business transactions rather than processing individual logfile lines.

@jonhoo
Copy link
Collaborator Author

jonhoo commented Nov 9, 2017

Point 1

It's about keeping operations to the underlying Rocks storage fast--the more operations you introduce in a single high-level command, the slower your throughput is going to be, i.e. a PUSH operation would now go from issuing a single Put to the underlying Rocks storage to issuing two Puts

I think this is where I'm not understanding you correctly. Doing two operations in a single transaction should not be twice as slow as doing a single operation in one transaction, because the WAL/durability overhead should be per transaction, not per operation. If Rocks writes and flushes each operation individually to disk, that means it's doing something silly.

all other operations would add an additional Put and possibly a Get if you need to deserialize the underlying "state" layer before overwriting it--that means you'd go from a single operation in ACK to potentially 3, two operations in Fail to potentially 4, and three operations in FETCH to potentially 5.

I'm not sure I understand this point? Any sensible database will either keep an in-memory copy of its state, or have datastructures on disk that can be manipulated without deserializing it entirely. If the database had to deserialize the entire dataset off disk for every operation, that would make it cripplingly slow!

Additionally, as I pointed out storage space would double.

Where do you get "double" from? The proposed scheme only stores the jids twice, not the whole Job.

You could alleviate a lot of this by doing all of these state look-ups in memory, but I'm not sure how that would work with restarts.

The database should already perform operations on an in-memory structure first, and then flush changes as a batch to disk when a transaction commits. Doing more operations should, in theory, not lead to incurring significantly more disk activity.

Also, I'm still unsure as to what purging old state would look like.

Not sure what you mean? The proposed scheme has no old state beyond dead jobs (just like in the current scheme).

Point 2

I think this discussion should likely be moved to #80, or to a new issue altogether, so I'll just respond briefly here. I completely agree that 1M/s should not be the goal for Faktory. My point was only that 10k seems low, and that I don't think writing to an additional data structure is going to have a significant impact. As you yourself observe, the overheads lie elsewhere.

The additional serialization overhead and multiple writes we do to move jobs into other persistent stores bottleneck the throughput of Fetch operations to the 10ks.

You showed in the benchmarks that serialization is ~3µs, and that RocksDB can do 460k writes/s. Neither of these suggest that we should be limited to 10k over a network.

But right now from what I can see network roundtrips are taking ~2/3 of the operation time.

60µs network roundtrip time on localhost? That seems like a lot...


@mperham I completely agree that performance should not be the focus of Faktory at the moment, nor in the foreseeable future. I was only making a performance argument to say that the cost of adding a secondary datastructure to support pending? is unlikely to make a meaningful difference to Faktory's overall performance.

@andrewstucki
Copy link

Also, taking a look at how RabbitMQ compares with full confirmation and persistence (which is what Faktory does right now due to using the WAL and RESP-based responses), Faktory already has it beat.

In general I think it's kind of unfair to compare in-memory stores with systems that actually persist to disk for every operation. Assuming that there was an option to make the store in-memory only and bump up the parallelism of the workers I imagine that Faktory's throughput would go through the roof.

@akshaylb
Copy link

+1

I understand the UI gives a great way to check up on the status of the tasks, but there may be cases where we may want to let a client know what the status of the current job is.

Without adding much complexity isn't it possible to just return if a certain jid is either processed, enqueued or failed?

@singingwolfboy
Copy link

Any progress on this issue? If Faktory won't support this natively, it would be nice to have a documentation page that describes how a developer could implement it in at the application level.

@mperham
Copy link
Collaborator

mperham commented Mar 10, 2019 via email

@mperham
Copy link
Collaborator

mperham commented Mar 13, 2020

Job Tracking is now in Faktory Enterprise and allows a client to poll for the state of a JID.

https://github.com/contribsys/faktory/wiki/Ent-Tracking

@mperham mperham closed this as completed Mar 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants