Skip to content

Commit

Permalink
Moving the generic WindowedSubscriber to core-amqp (#39955)
Browse files Browse the repository at this point in the history
  • Loading branch information
anuchandy committed Apr 29, 2024
1 parent 87e28e1 commit 61e0271
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 156 deletions.
2 changes: 2 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- `WindowedSubscriber` to translate the asynchronous stream of events or messages to `IterableStream` ([38705](https://github.com/Azure/azure-sdk-for-java/pull/38705)).

### Breaking Changes

### Bugs Fixed
Expand Down
5 changes: 4 additions & 1 deletion sdk/core/azure-core-amqp/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@

<Match>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" />
<Class name="com.azure.core.amqp.implementation.ReactorExecutor" />
<Or>
<Class name="com.azure.core.amqp.implementation.ReactorExecutor" />
<Class name="com.azure.core.amqp.implementation.WindowedSubscriber$WindowWork" />
</Or>
</Match>

<Match>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus;
package com.azure.core.amqp.implementation;

import com.azure.core.util.IterableStream;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -43,7 +43,7 @@
*
* @param <T> the type of items in the window.
*/
final class WindowedSubscriber<T> extends BaseSubscriber<T> {
public final class WindowedSubscriber<T> extends BaseSubscriber<T> {
private static final String WORK_ID_KEY = "workId";
private static final String UPSTREAM_REQUESTED_KEY = "requested";
private static final String DIFFERENCE_KEY = "difference";
Expand All @@ -62,25 +62,21 @@ final class WindowedSubscriber<T> extends BaseSubscriber<T> {

private volatile Subscription s;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<WindowedSubscriber, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(WindowedSubscriber.class,
Subscription.class,
"s");
private static final AtomicReferenceFieldUpdater<WindowedSubscriber, Subscription> S
= AtomicReferenceFieldUpdater.newUpdater(WindowedSubscriber.class, Subscription.class, "s");
private volatile long requested;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<WindowedSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(WindowedSubscriber.class, "requested");
private static final AtomicLongFieldUpdater<WindowedSubscriber> REQUESTED
= AtomicLongFieldUpdater.newUpdater(WindowedSubscriber.class, "requested");
private volatile int wip;
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<WindowedSubscriber> WIP =
AtomicIntegerFieldUpdater.newUpdater(WindowedSubscriber.class, "wip");
private static final AtomicIntegerFieldUpdater<WindowedSubscriber> WIP
= AtomicIntegerFieldUpdater.newUpdater(WindowedSubscriber.class, "wip");
private volatile boolean done;
private volatile Throwable error;
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<WindowedSubscriber, Throwable> ERROR =
AtomicReferenceFieldUpdater.newUpdater(WindowedSubscriber.class,
Throwable.class,
"error");
private static final AtomicReferenceFieldUpdater<WindowedSubscriber, Throwable> ERROR
= AtomicReferenceFieldUpdater.newUpdater(WindowedSubscriber.class, Throwable.class, "error");

/**
* Creates the subscriber to split upstream sequence into a series of windows.
Expand All @@ -89,7 +85,8 @@ final class WindowedSubscriber<T> extends BaseSubscriber<T> {
* @param terminatedMessage the message to include in the error when closing a window after the subscriber termination.
* @param options the optional configurations for the subscriber.
*/
WindowedSubscriber(Map<String, Object> loggingContext, String terminatedMessage, WindowedSubscriberOptions<T> options) {
public WindowedSubscriber(Map<String, Object> loggingContext, String terminatedMessage,
WindowedSubscriberOptions<T> options) {
this.loggingContext = Objects.requireNonNull(loggingContext, "'loggingContext' cannot be null.");
Objects.requireNonNull(terminatedMessage, "'terminatedMessage' cannot be null.");
this.terminatedMessage = terminatedMessage + " (Reason: %s)";
Expand Down Expand Up @@ -130,7 +127,8 @@ EnqueueResult<T> enqueueRequestImpl(int windowSize, Duration windowTimeout) {
throw logger.logExceptionAsError(new NullPointerException("'windowTimeout' cannot be null."));
}
if (windowTimeout.isNegative() || windowTimeout.isZero()) {
throw logger.logExceptionAsError(new IllegalArgumentException("'windowTimeout' period must be strictly positive."));
throw logger
.logExceptionAsError(new IllegalArgumentException("'windowTimeout' period must be strictly positive."));
}

final long workId = idGenerator.getAndIncrement();
Expand Down Expand Up @@ -226,7 +224,7 @@ private void drain() {
private void drainLoop() {
int missed = 1;
// Begin: drain-loop
for (; ;) {
for (;;) {
if (isDoneOrCanceled()) {
if (cleanCloseStreamingWindowOnTerminate) {
WindowWork<T> w0 = workQueue.peek();
Expand All @@ -249,7 +247,8 @@ private void drainLoop() {
} else if (w0.isCanceled()) {
w0.terminate(WorkTerminalState.CANCELED);
} else {
throw w0.getLogger().log(new IllegalStateException("work with unexpected state in timeout-cancel queue."));
throw w0.getLogger()
.log(new IllegalStateException("work with unexpected state in timeout-cancel queue."));
}
}
}
Expand Down Expand Up @@ -392,9 +391,8 @@ private void initWorkOnce(WindowWork<T> w) {
final long workDemand = w.getDemand();
final long difference = workDemand - requested;

final LoggingEventBuilder logger = w.getLogger()
.addKeyValue(UPSTREAM_REQUESTED_KEY, requested)
.addKeyValue(DIFFERENCE_KEY, difference);
final LoggingEventBuilder logger
= w.getLogger().addKeyValue(UPSTREAM_REQUESTED_KEY, requested).addKeyValue(DIFFERENCE_KEY, difference);

if (difference > 0) {
Operators.addCap(REQUESTED, this, difference);
Expand Down Expand Up @@ -434,7 +432,7 @@ private Throwable getTerminalError() {
*
* @param <T> the type of items in the windows that {@link WindowedSubscriber} produces.
*/
static final class WindowedSubscriberOptions<T> {
public static final class WindowedSubscriberOptions<T> {
private Consumer<T> releaser;
private Duration nextItemTimout;
private Function<Flux<T>, Flux<T>> windowDecorator;
Expand All @@ -443,7 +441,7 @@ static final class WindowedSubscriberOptions<T> {
/**
* Creates the optional configurations for {@link WindowedSubscriber}.
*/
WindowedSubscriberOptions() {
public WindowedSubscriberOptions() {
this.releaser = null;
this.nextItemTimout = null;
this.windowDecorator = null;
Expand Down Expand Up @@ -499,7 +497,7 @@ private boolean shouldCleanCloseStreamingWindowOnTerminate() {
* @param releaser the callback to drop the items in the absence of a window.
* @return the updated {@link WindowedSubscriberOptions} object.
*/
WindowedSubscriberOptions<T> setReleaser(Consumer<T> releaser) {
public WindowedSubscriberOptions<T> setReleaser(Consumer<T> releaser) {
this.releaser = Objects.requireNonNull(releaser, "'releaser' cannot be null.");
return this;
}
Expand All @@ -514,7 +512,7 @@ WindowedSubscriberOptions<T> setReleaser(Consumer<T> releaser) {
* @param nextItemTimout the next window item timeout.
* @return the updated {@link WindowedSubscriberOptions} object.
*/
WindowedSubscriberOptions<T> setNextItemTimeout(Duration nextItemTimout) {
public WindowedSubscriberOptions<T> setNextItemTimeout(Duration nextItemTimout) {
this.nextItemTimout = Objects.requireNonNull(nextItemTimout, "'nextItemTimout' cannot be null.");
return this;
}
Expand All @@ -525,7 +523,7 @@ WindowedSubscriberOptions<T> setNextItemTimeout(Duration nextItemTimout) {
* @param windowDecorator the window decorator.
* @return the updated {@link WindowedSubscriberOptions} object.
*/
WindowedSubscriberOptions<T> setWindowDecorator(Function<Flux<T>, Flux<T>> windowDecorator) {
public WindowedSubscriberOptions<T> setWindowDecorator(Function<Flux<T>, Flux<T>> windowDecorator) {
this.windowDecorator = Objects.requireNonNull(windowDecorator, "'windowDecorator' cannot be null.");
return this;
}
Expand Down Expand Up @@ -562,7 +560,7 @@ WindowedSubscriberOptions<T> setWindowDecorator(Function<Flux<T>, Flux<T>> windo
*
* @return the updated {@link WindowedSubscriberOptions} object.
*/
WindowedSubscriberOptions<T> cleanCloseStreamingWindowOnTerminate() {
public WindowedSubscriberOptions<T> cleanCloseStreamingWindowOnTerminate() {
this.cleanCloseStreamingWindowOnTerminate = true;
return this;
}
Expand Down Expand Up @@ -721,17 +719,16 @@ private Flux<T> windowFlux(boolean drainOnCancel) {
final Function<Flux<T>, Flux<T>> decorator = parent.windowDecorator;
final Flux<T> flux = decorator != null ? decorator.apply(sink.asFlux()) : sink.asFlux();
if (drainOnCancel) {
return flux
.doFinally(s -> {
if (s == SignalType.CANCEL) {
isCanceled.set(true);
final WindowWork<T> w = this;
// It's very likely that the cancel signaling happened from application (user) thread.
// Offload the responsibility of drain-loop run (for rolling to next work) to worker thread,
// and free up the application thread.
Schedulers.boundedElastic().schedule(() -> parent.postTimedOutOrCanceledWork(w));
}
});
return flux.doFinally(s -> {
if (s == SignalType.CANCEL) {
isCanceled.set(true);
final WindowWork<T> w = this;
// It's very likely that the cancel signaling happened from application (user) thread.
// Offload the responsibility of drain-loop run (for rolling to next work) to worker thread,
// and free up the application thread.
Schedulers.boundedElastic().schedule(() -> parent.postTimedOutOrCanceledWork(w));
}
});
} else {
return flux;
}
Expand Down Expand Up @@ -768,8 +765,7 @@ private EmitNextResult tryEmitNext(T item) {
if (emitResult == Sinks.EmitResult.OK) {
return EmitNextResult.OK;
} else {
withPendingKey(logger.atError()).addKeyValue(EMIT_RESULT_KEY, emitResult)
.log("Could not emit-next.");
withPendingKey(logger.atError()).addKeyValue(EMIT_RESULT_KEY, emitResult).log("Could not emit-next.");
return EmitNextResult.SINK_ERROR;
}
}
Expand All @@ -790,17 +786,13 @@ private void terminate(WorkTerminalState terminalState) {
timers.dispose();
} finally {
if (terminalState == WorkTerminalState.SINK_ERROR) {
withPendingKey(logger.atWarning())
.addKeyValue("reason", "sink-error")
.log(TERMINATING_WORK);
withPendingKey(logger.atWarning()).addKeyValue("reason", "sink-error").log(TERMINATING_WORK);
return;
}

if (terminalState == WorkTerminalState.CANCELED) {
assertCondition(isCanceled(), terminalState);
withPendingKey(logger.atWarning())
.addKeyValue("reason", "sink-canceled")
.log(TERMINATING_WORK);
withPendingKey(logger.atWarning()).addKeyValue("reason", "sink-canceled").log(TERMINATING_WORK);
return;
}

Expand All @@ -824,13 +816,11 @@ private void terminate(WorkTerminalState terminalState) {
final TimeoutReason reason = timeoutReason.get();
final Throwable e = reason.getError();
if (e != null) {
withPendingKey(logger.atWarning())
.addKeyValue("reason", reason.getMessage())
withPendingKey(logger.atWarning()).addKeyValue("reason", reason.getMessage())
.log(TERMINATING_WORK, e);
closeWindow(e);
} else {
withPendingKey(logger.atVerbose())
.addKeyValue("reason", reason.getMessage())
withPendingKey(logger.atVerbose()).addKeyValue("reason", reason.getMessage())
.log(TERMINATING_WORK);
closeWindow();
}
Expand All @@ -847,8 +837,7 @@ private void terminate(WorkTerminalState terminalState) {

if (terminalState == WorkTerminalState.PARENT_TERMINAL_CLEAN_CLOSE) {
assertCondition(parent.isDoneOrCanceled() && isStreaming(), terminalState);
withPendingKey(logger.atWarning())
.addKeyValue("reason", "terminal-clean-close")
withPendingKey(logger.atWarning()).addKeyValue("reason", "terminal-clean-close")
.log(TERMINATING_WORK);
closeWindow();
return;
Expand All @@ -865,8 +854,7 @@ private void terminate(WorkTerminalState terminalState) {
private Disposable beginTimeoutTimer() {
final Disposable disposable = Mono.delay(timeout)
.publishOn(Schedulers.boundedElastic())
.subscribe(__ -> onTimeout(TimeoutReason.TIMEOUT),
e -> onTimeout(TimeoutReason.timeoutErrored(e)));
.subscribe(__ -> onTimeout(TimeoutReason.TIMEOUT), e -> onTimeout(TimeoutReason.timeoutErrored(e)));
return disposable;
}

Expand Down Expand Up @@ -1022,7 +1010,8 @@ private static <T> Sinks.Many<T> createSink() {
*/
private static final class TimeoutReason {
static final TimeoutReason TIMEOUT = new TimeoutReason("Timeout occurred.", null);
static final TimeoutReason TIMEOUT_NEXT_ITEM = new TimeoutReason("Timeout between the messages occurred.", null);
static final TimeoutReason TIMEOUT_NEXT_ITEM
= new TimeoutReason("Timeout between the messages occurred.", null);

private final String message;
private final Throwable error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public Mono<Void> terminateAndAwaitForDispositionsInProgressToComplete() {
final List<Mono<Void>> workMonoList = new ArrayList<>();
final StringJoiner deliveryTags = new StringJoiner(", ");
for (DispositionWork work : pendingDispositions.values()) {
if (work == null || work.hasTimedout()) {
if (work == null || work.hasTimedOut()) {
continue;
}
if (work.getDesiredState() instanceof TransactionalState) {
Expand Down Expand Up @@ -475,7 +475,7 @@ private void completeDispositionWorksOnTimeout(String callSite) {
final StringJoiner deliveryTags = new StringJoiner(", ");

pendingDispositions.forEach((deliveryTag, work) -> {
if (work == null || !work.hasTimedout()) {
if (work == null || !work.hasTimedOut()) {
return;
}

Expand Down Expand Up @@ -697,7 +697,7 @@ Throwable getRejectedOutcomeError() {
*
* @return {@code true} if the work has timed out, {@code false} otherwise.
*/
boolean hasTimedout() {
boolean hasTimedOut() {
return expirationTime != null && expirationTime.isBefore(Instant.now());
}

Expand Down
Loading

0 comments on commit 61e0271

Please sign in to comment.