Skip to content

Commit

Permalink
MultiFromBlockingInputStream RC fix (#5061)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
  • Loading branch information
danielkec authored Oct 3, 2022
1 parent 50548e8 commit 45e0bcc
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,8 +22,8 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntSupplier;

class MultiFromBlockingInputStream extends MultiFromInputStream {
Expand Down Expand Up @@ -68,9 +68,8 @@ public Multi<ByteBuffer> withByteBufferSize(final int bufferSize) {
static final class InputStreamSubscription extends MultiFromInputStream.InputStreamSubscription {

private final ExecutorService executorService;
private final LinkedBlockingQueue<Runnable> submitQueue = new LinkedBlockingQueue<>();
private Lock lck = new ReentrantLock();

private final AtomicBoolean draining = new AtomicBoolean(false);

InputStreamSubscription(Flow.Subscriber<? super ByteBuffer> downstream,
InputStream inputStream,
Expand All @@ -81,24 +80,14 @@ static final class InputStreamSubscription extends MultiFromInputStream.InputStr
}

protected void trySubmit(long n) {
submitQueue.add(() -> {
submit(n);
drainSubmitQueue();
});
drainSubmitQueue();
}

private void drainSubmitQueue() {
if (!draining.getAndSet(true)) {
executorService.submit(() -> {
try {
Runnable job = submitQueue.poll();
if (job != null) {
executorService.submit(job);
}
lck.lock();
submit(n);
} finally {
draining.set(false);
lck.unlock();
}
}
});
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
* Copyright (c) 2020, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,5 +61,4 @@ protected Multi<ByteBuffer> getPublisher(InputStream is) {
.byteBufferSize(BUFFER_SIZE)
.build();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
* Copyright (c) 2021, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ public class MultiFromTrustedInputStreamTckTest extends FlowPublisherVerificatio
static final int BUFFER_SIZE = 4;

public MultiFromTrustedInputStreamTckTest() {
super(new TestEnvironment(50));
super(new TestEnvironment(500));
}

@Override
Expand Down

0 comments on commit 45e0bcc

Please sign in to comment.