-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(Dgraph): Add task queue implementation #7716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
4059039 to
53a0ac2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good but there are two pieces missing.
- Routing of the requests - We will do this in a separate PR.
- Periodic cleanup of the tasks - This needs to be done in this PR.
- Update ristretto - We need the persistent mmap tree.
2 and 3 need to go in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change would also mean that only a single export/backup can run at a time, right?
|
Correct. To avoid overloading Dgraph, there is only a single thread for executing long-running tasks like exports or backups. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only looked at the GraphQL API bit and that looks fine to me
Reviewable status: 0 of 15 files reviewed, 12 unresolved discussions (waiting on @ajeetdsouza, @manishrjain, @NamanJain8, and @vvbalaji-dgraph)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 12 files at r1, 9 of 9 files at r2.
Reviewable status: all files reviewed, 12 unresolved discussions (waiting on @ajeetdsouza, @manishrjain, @NamanJain8, and @vvbalaji-dgraph)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 12 files at r1, 7 of 9 files at r2, 3 of 3 files at r3.
Reviewable status: all files reviewed, 23 unresolved discussions (waiting on @ajeetdsouza, @jarifibrahim, @NamanJain8, and @vvbalaji-dgraph)
protos/pb.proto, line 638 at r3 (raw file):
repeated string predicates = 10; bool force_full = 11;
force_full is just since_ts = 0. No need to have a separate field for this.
worker/queue.go, line 113 at r1 (raw file):
Previously, jarifibrahim (Ibrahim Jarif) wrote…
Use return.
Add another case with a hourly ticker. Every hour, it can call the cleanup.
worker/queue.go, line 52 at r3 (raw file):
log: z.NewTree(), logMu: new(sync.Mutex), shouldCleanup: time.NewTicker(taskTtl),
Doesn't need to be here. This can be a local variable in the cleanup func.
worker/queue.go, line 107 at r3 (raw file):
func (t tasks) cleanup() { select { case <-t.shouldCleanup.C:
Remove the shouldCleanup.
worker/queue.go, line 128 at r3 (raw file):
case <-x.ServerCloser.HasBeenClosed(): return case task = <-t.queue:
case ...:
handleTask(...)
worker/queue.go, line 133 at r3 (raw file):
// Fetch the task from the log. If the task isn't found, this means it has expired (older // than taskTtl. meta := TaskMeta(t.Get(task.id))
Have a function for handling the task. So, you can return from it instead of "continuing".
worker/queue.go, line 162 at r3 (raw file):
// QueueBackup enqueues a backup request in the task queue. func (t tasks) QueueBackup(req *pb.BackupRequest) (uint64, error) {
Why have this func?
worker/queue.go, line 167 at r3 (raw file):
// QueueExport enqueues an export request in the task queue. func (t tasks) QueueExport(req *pb.ExportRequest) (uint64, error) {
Why have this func?
worker/queue.go, line 173 at r3 (raw file):
// queueTask enqueues a request of any type. // Don't use this function directly. func (t tasks) queueTask(req interface{}) (uint64, error) {
Make this public, and return error if you get something wrong (you could panic).
worker/queue.go, line 182 at r3 (raw file):
} // Unable to generate a unique random number. if attempt >= 8 {
Don't really need this. You can loop infinitely.
worker/queue.go, line 198 at r3 (raw file):
select { case t.queue <- task: t.cleanup()
Don't do this.
worker/queue.go, line 203 at r3 (raw file):
default: return 0, fmt.Errorf("too many pending tasks, please try again later") }
Instead of returning immediately, wait for a few seconds. Do it in a loop or something.
for i := 0; i < 3; i++ {
// check task status. If running, return.
// if failed, return
// if queued, sleep.
}
return latest status.
Ideally, we wait for a few seconds, and see what happens with the task. If it starts running immediately, then we can tell user it's running. If it fails quickly, we can tell that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 23 of 23 files at r4.
Reviewable status: all files reviewed, 16 unresolved discussions (waiting on @ajeetdsouza, @jarifibrahim, @NamanJain8, and @vvbalaji-dgraph)
protos/pb.proto, line 629 at r4 (raw file):
message BackupRequest { uint64 read_ts = 1; uint64 since_ts = 2;
Wrong indentation.
worker/queue.go, line 118 at r4 (raw file):
// worker loops forever, running queued tasks one at a time. Any returned errors are logged. func (t tasks) worker() { shouldCleanup := time.NewTicker(time.Hour)
defer shouldCleanup.Stop()
This PR adds an asynchronous persistent task queue for long-running Dgraph operations (backups and exports).
/adminAPI./adminAPI will forward a task query to the correct Alpha./admin/{backup,export}APIs have been removed, since this change would have broken their APIs anyway.Query:
Response:
{ "data": { "export": { "response": { "code": "Success", "message": "Export queued with ID 0x3fc8a834aff7a9f7" }, "taskId": "0x3fc8a834aff7a9f7" } }, "extensions": { "tracing": { "duration": 1001284394, "endTime": "2021-05-03T15:05:17.40714509+05:30", "startTime": "2021-05-03T15:05:16.405861186+05:30", "version": 1 } } }Query:
Response:
{ "data": { "task": { "kind": "Export", "lastUpdated": "2021-05-03T15:05:16+05:30", "status": "Success" } }, "extensions": { "tracing": { "duration": 278797, "endTime": "2021-05-03T15:07:00.516724037+05:30", "startTime": "2021-05-03T15:07:00.516445729+05:30", "version": 1 } } }This change is