-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make BigQueue class more resilient. #2925
Conversation
/test connector=source-postgres
|
/test connector=destination-postgres
|
'}'; | ||
} | ||
|
||
private interface NonBlockingOp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NonBlockingOp
/ BlockingOp
aren't very descriptive imo and are identical in terms of a specific Lock
. Really they're just safely locking around a block.
We could instead have:
private class LockedSupplier<T> {
private final Lock lock;
private final Supplier<T> delegate;
public LockedSupplier(Lock lock, Supplier<T> delegate) {
this.lock = lock;
this.delegate = delegate;
}
static <T> T get(Lock lock, Supplier<T> supplier) {
lock.lock();
try {
return supplier.get();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
And then we would call it with:
return new LockedSupplier(lock.writeLock(), () -> {
try {
List<byte[]> elements = new ArrayList<>();
while (!queue.isEmpty()) {
elements.add(queue.dequeue());
}
for (byte[] e : elements) {
queue.enqueue(e);
}
return elements.iterator();
} catch (IOException e) {
throw new RuntimeException(e);
}
}).get();
or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this breaks as soon as the contents of the queue is larger than memory. i think we expect the contents of the queue to be larger than memory though, so i think that's a deal breaker. see more detailed comment below.
} | ||
return poll; | ||
for (byte[] e : elements) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure that this is going to work with any sizable amount of data. once the amount of data in the queue is bigger than memory this is going to OOM. the whole point of using the disk queue though is because we expect to the queue to be bigger than memory. so if we are going to make it a constraint now that the queue has to be smaller than memory, then we've defeated the purpose of the queue. lmk if i'm misunderstanding something!
even if this doesn't oom, this does seems super expensive.
i think we need to find a different path here. happy to brainstorm a big more about what we should do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like we just want to prevent unexpected data loss from calling .iterator()
or printing it?
Can we just have those actions throw exceptions? It just takes away the footguns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing an exception was my first instinct so happy to do that; practical to disable a dangerous behavior and fix it later.
I added the locking stuff as an escape hatch in case a dev really needs to print this out.
Briefly spoke to Charles - who is in favor of switching out the inherited classes to cut this out at the source. I think that's the right approach too. What if we throw an exception for now, and add a todo to do that? The exception provides a small safeguard, which is slightly better than the huge pothole we have now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that works for me. doing just the exception for now to disable the footgun seems like the right level of effort.
What
Today, printing out the BigQueue class or calling its iterator empties it.
This is because printing out the structure invokes the iterator, which actually dequeues the records. The PR seeks to correct this behaviour.
This PR also changes the behavior where iterating over the queue returns an in-memory copy of the queue. This does add the caveat that this iteration can cause a OOM exception, since the current implementation attempts to read the entire queue into memory.
Added tests to the BigQueue testing class, but still running the integration test for sanity check.
How
toString
so calling thetoString
function prints the queue's size and not its contents.iterator
to retrieve the entire contents of the queue, and return an iterator to an in-memory list. Existing contents are then written back into the queue.offer
anditerator
are blocking and other methods are not. This is required since offer and iterator modify the queue's contents/Recommended reading order
BigQueue.java
BigQueueTest.java