Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public class ConsumerConstant {
public static final String CONSUME_LISTENER_KEY = "consume-listener";

public static final String AUTO_POLL_INTERVAL_MS_KEY = "auto-poll-interval-ms";
public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 5_000L;
public static final long AUTO_POLL_INTERVAL_MS_MIN_VALUE = 500L;
public static final long AUTO_POLL_INTERVAL_MS_DEFAULT_VALUE = 100L;

public static final String AUTO_POLL_TIMEOUT_MS_KEY = "auto-poll-timeout-ms";
public static final long AUTO_POLL_TIMEOUT_MS_DEFAULT_VALUE = 10_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ abstract class SubscriptionConsumer implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConsumer.class);

private static final long SLEEP_NS = 1_000_000_000L;
private static final long SLEEP_NS = 100_000_000L; // 100ms

private final String username;
private final String password;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ private SubscriptionPushConsumer(
this.ackStrategy = ackStrategy;
this.consumeListener = consumeListener;

this.autoPollIntervalMs =
Math.max(autoPollIntervalMs, ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
this.autoPollTimeoutMs =
Math.max(autoPollTimeoutMs, ConsumerConstant.AUTO_POLL_TIMEOUT_MS_MIN_VALUE);
}
Expand Down Expand Up @@ -290,8 +289,7 @@ public Builder consumeListener(final ConsumeListener consumeListener) {
}

public Builder autoPollIntervalMs(final long autoPollIntervalMs) {
this.autoPollIntervalMs =
Math.max(autoPollIntervalMs, ConsumerConstant.AUTO_POLL_INTERVAL_MS_MIN_VALUE);
this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class SubscriptionCoordinator {
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionCoordinator.class);

private final ConfigManager configManager;

// NEVER EXPOSE THIS DIRECTLY TO THE OUTSIDE
private final SubscriptionInfo subscriptionInfo;

private final PipeTaskCoordinatorLock coordinatorLock;
Expand Down Expand Up @@ -122,7 +124,9 @@ public void stopSubscriptionMetaSync() {
subscriptionMetaSyncer.stop();
}

/** Caller should ensure that the method is called in the lock {@link #tryLock}. */
/**
* Caller should ensure that the method is called in the write lock of {@link #subscriptionInfo}.
*/
public void updateLastSyncedVersion() {
subscriptionInfo.updateLastSyncedVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void cleanup() {

public SubscriptionEvent poll(final String consumerId) {
if (prefetchingQueue.isEmpty()) {
tryPrefetch();
tryPrefetch(true);
}

final long size = prefetchingQueue.size();
Expand Down Expand Up @@ -140,14 +140,27 @@ public SubscriptionEvent poll(final String consumerId) {
public abstract void executePrefetch();

/**
* prefetch at most one {@link SubscriptionEvent} from {@link
* Prefetch at most one {@link SubscriptionEvent} from {@link
* SubscriptionPrefetchingQueue#inputPendingQueue} to {@link
* SubscriptionPrefetchingQueue#prefetchingQueue}
* SubscriptionPrefetchingQueue#prefetchingQueue}.
*
* <p>It will continuously attempt to prefetch and generate a {@link SubscriptionEvent} until
* {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty.
*
* @param trySealBatchIfEmpty {@code true} if {@link SubscriptionPrefetchingQueue#trySealBatch} is
* called when {@link SubscriptionPrefetchingQueue#inputPendingQueue} is empty, {@code false}
* otherwise
*/
protected void tryPrefetch() {
Event event;
while (Objects.nonNull(
event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll()))) {
protected void tryPrefetch(final boolean trySealBatchIfEmpty) {
while (!inputPendingQueue.isEmpty()) {
final Event event = UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
if (Objects.isNull(event)) {
// The event will be null in two cases:
// 1. The inputPendingQueue is empty.
// 2. The tsfile event has been deduplicated.
continue;
}

if (!(event instanceof EnrichedEvent)) {
LOGGER.warn(
"Subscription: SubscriptionPrefetchingQueue {} only support prefetch EnrichedEvent. Ignore {}.",
Expand All @@ -169,26 +182,35 @@ protected void tryPrefetch() {

if (event instanceof TabletInsertionEvent) {
if (onEvent((TabletInsertionEvent) event)) {
break;
return;
}
} else if (event instanceof PipeTsFileInsertionEvent) {
continue;
}

if (event instanceof PipeTsFileInsertionEvent) {
if (onEvent((PipeTsFileInsertionEvent) event)) {
break;
}
} else {
// TODO:
// - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
// - UserDefinedEnrichedEvent: ignored?
// - Others: events related to meta sync, safe to ignore
LOGGER.info(
"Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.",
this,
event);
if (trySealBatch()) {
break;
return;
}
continue;
}

// TODO:
// - PipeHeartbeatEvent: ignored? (may affect pipe metrics)
// - UserDefinedEnrichedEvent: ignored?
// - Others: events related to meta sync, safe to ignore
LOGGER.info(
"Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.",
this,
event);
if (trySealBatch()) {
return;
}
}

// At this moment, the inputPendingQueue is empty.
if (trySealBatchIfEmpty) {
trySealBatch();
}
}

/**
Expand Down Expand Up @@ -222,6 +244,7 @@ public boolean ack(final String consumerId, final SubscriptionCommitContext comm
}

if (event.isCommitted()) {
event.cleanup();
LOGGER.warn(
"Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}",
event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void cleanup() {

@Override
public void executePrefetch() {
super.tryPrefetch();
super.tryPrefetch(false);
this.serializeEventsInQueue();
}

Expand Down Expand Up @@ -114,13 +114,18 @@ private boolean onEventInternal(final EnrichedEvent event) {

@Override
protected boolean trySealBatch() {
final AtomicBoolean result = new AtomicBoolean(false);
currentBatchRef.getAndUpdate(
(batch) -> {
sealBatch(batch);
return new SubscriptionPipeTabletEventBatch(
BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
if (batch.shouldEmit()) {
sealBatch(batch);
result.set(true);
return new SubscriptionPipeTabletEventBatch(
BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES);
}
return batch;
});
return true;
return result.get();
}

private void sealBatch(final SubscriptionPipeTabletEventBatch batch) {
Expand Down
Loading