Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -82,6 +83,12 @@ public abstract class SubscriptionPrefetchingQueue {
* SubscriptionPrefetchingQueue#ack}, etc. However, it does not enforce mutual exclusion among the
* other operations themselves.
*
* <p>Under the premise of obtaining this lock, to avoid inconsistencies, updates to the {@link
* SubscriptionEvent} in both {@link SubscriptionPrefetchingQueue#prefetchingQueue} and {@link
* SubscriptionPrefetchingQueue#inFlightEvents} MUST be performed within the {@link
* ConcurrentHashMap#compute} method of inFlightEvents in the {@link
* SubscriptionPrefetchingQueue}.
*
* <p>This lock is created with fairness set to true, which means threads acquire the lock in the
* order they requested it, to avoid thread starvation.
*/
Expand Down Expand Up @@ -404,50 +411,60 @@ public boolean ack(final String consumerId, final SubscriptionCommitContext comm
*/
private boolean ackInternal(
final String consumerId, final SubscriptionCommitContext commitContext) {
final SubscriptionEvent event = inFlightEvents.get(new Pair<>(consumerId, commitContext));
if (Objects.isNull(event)) {
LOGGER.warn(
"Subscription: subscription commit context {} does not exist, it may have been committed or something unexpected happened, prefetching queue: {}",
commitContext,
this);
return false;
}

if (event.isCommitted()) {
LOGGER.warn(
"Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}",
event,
commitContext,
this);
return false;
}

if (!event.isCommittable()) {
LOGGER.warn(
"Subscription: subscription event {} is not committable, subscription commit context {}, prefetching queue: {}",
event,
commitContext,
this);
return false;
}
final AtomicBoolean acked = new AtomicBoolean(false);
inFlightEvents.compute(
new Pair<>(consumerId, commitContext),
(key, ev) -> {
if (Objects.isNull(ev)) {
LOGGER.warn(
"Subscription: subscription commit context {} does not exist, it may have been committed or something unexpected happened, prefetching queue: {}",
commitContext,
this);
return null;
}

if (ev.isCommitted()) {
LOGGER.warn(
"Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}",
ev,
commitContext,
this);
return ev;
}

if (ev.isCommitted()) {
LOGGER.warn(
"Subscription: subscription event {} is committed, subscription commit context {}, prefetching queue: {}",
ev,
commitContext,
this);
// clean up committed event
ev.cleanUp();
return null; // remove this entry
}

// check if a consumer acks event from another consumer group...
final String consumerGroupId = commitContext.getConsumerGroupId();
if (!Objects.equals(consumerGroupId, brokerId)) {
LOGGER.warn(
"inconsistent consumer group when acking event, current: {}, incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, commit it anyway...",
brokerId,
consumerGroupId,
consumerId,
commitContext,
this);
}

// check if a consumer acks event from another consumer group...
final String consumerGroupId = commitContext.getConsumerGroupId();
if (!Objects.equals(consumerGroupId, brokerId)) {
LOGGER.warn(
"inconsistent consumer group when acking event, current: {}, incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, commit it anyway...",
brokerId,
consumerGroupId,
consumerId,
commitContext,
this);
}
ev.ack();
ev.recordCommittedTimestamp(); // now committed
acked.set(true);

event.ack();
event.recordCommittedTimestamp(); // now committed
// clean up committed event
ev.cleanUp();
return null; // remove this entry
});

// no need to update inFlightEvents
return true;
return acked.get();
}

/**
Expand All @@ -467,31 +484,38 @@ public boolean nack(final String consumerId, final SubscriptionCommitContext com
*/
public boolean nackInternal(
final String consumerId, final SubscriptionCommitContext commitContext) {
final SubscriptionEvent event = inFlightEvents.get(new Pair<>(consumerId, commitContext));
if (Objects.isNull(event)) {
LOGGER.warn(
"Subscription: subscription commit context [{}] does not exist, it may have been committed or something unexpected happened, prefetching queue: {}",
commitContext,
this);
return false;
}
final AtomicBoolean nacked = new AtomicBoolean(false);
inFlightEvents.compute(
new Pair<>(consumerId, commitContext),
(key, ev) -> {
if (Objects.isNull(ev)) {
LOGGER.warn(
"Subscription: subscription commit context [{}] does not exist, it may have been committed or something unexpected happened, prefetching queue: {}",
commitContext,
this);
return null;
}

// check if a consumer nacks event from another consumer group...
final String consumerGroupId = commitContext.getConsumerGroupId();
if (!Objects.equals(consumerGroupId, brokerId)) {
LOGGER.warn(
"inconsistent consumer group when nacking event, current: {}, incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, commit it anyway...",
brokerId,
consumerGroupId,
consumerId,
commitContext,
this);
}

// check if a consumer nacks event from another consumer group...
final String consumerGroupId = commitContext.getConsumerGroupId();
if (!Objects.equals(consumerGroupId, brokerId)) {
LOGGER.warn(
"inconsistent consumer group when nacking event, current: {}, incoming: {}, consumer id: {}, event commit context: {}, prefetching queue: {}, commit it anyway...",
brokerId,
consumerGroupId,
consumerId,
commitContext,
this);
}
ev.nack(); // now pollable
nacked.set(true);

event.nack(); // now pollable
// no need to update inFlightEvents and prefetchingQueue
return ev;
});

// no need to update inFlightEvents and prefetchingQueue
return true;
return nacked.get();
}

public SubscriptionCommitContext generateSubscriptionCommitContext() {
Expand Down Expand Up @@ -545,7 +569,7 @@ public int getPipeEventCount() {
.map(SubscriptionEvent::getPipeEventCount)
.reduce(Integer::sum)
.orElse(0)
+ +inFlightEvents.values().stream()
+ inFlightEvents.values().stream()
.map(SubscriptionEvent::getPipeEventCount)
.reduce(Integer::sum)
.orElse(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue {

Expand Down Expand Up @@ -69,77 +70,92 @@ public SubscriptionEvent pollTablets(

private @NonNull SubscriptionEvent pollTabletsInternal(
final String consumerId, final SubscriptionCommitContext commitContext, final int offset) {
// 1. Extract current event and check it
final SubscriptionEvent event =
inFlightEvents.compute(
new Pair<>(consumerId, commitContext),
(key, ev) -> {
if (Objects.nonNull(ev) && ev.isCommitted()) {
ev.cleanUp();
return null; // remove this entry
final AtomicReference<SubscriptionEvent> eventRef = new AtomicReference<>();
inFlightEvents.compute(
new Pair<>(consumerId, commitContext),
(key, ev) -> {
// 1. Extract current event and check it
if (Objects.isNull(ev)) {
final String errorMessage =
String.format(
"SubscriptionPrefetchingTabletQueue %s is currently not transferring any tablet to consumer %s, commit context: %s, offset: %s",
this, consumerId, commitContext, offset);
LOGGER.warn(errorMessage);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
return null;
}

if (ev.isCommitted()) {
ev.cleanUp();
final String errorMessage =
String.format(
"outdated poll request after commit, consumer id: %s, commit context: %s, offset: %s, prefetching queue: %s",
consumerId, commitContext, offset, this);
LOGGER.warn(errorMessage);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
return null; // remove this entry
}

// check consumer id
if (!Objects.equals(ev.getLastPolledConsumerId(), consumerId)) {
final String errorMessage =
String.format(
"inconsistent polled consumer id, current: %s, incoming: %s, commit context: %s, offset: %s, prefetching queue: %s",
ev.getLastPolledConsumerId(), consumerId, commitContext, offset, this);
LOGGER.warn(errorMessage);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
return ev;
}

final SubscriptionPollResponse response = ev.getCurrentResponse();
final SubscriptionPollPayload payload = response.getPayload();

// 2. Check previous response type and offset
final short responseType = response.getResponseType();
if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
final String errorMessage = String.format("unexpected response type: %s", responseType);
LOGGER.warn(errorMessage);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
return ev;
}

switch (SubscriptionPollResponseType.valueOf(responseType)) {
case TABLETS:
// check offset
if (!Objects.equals(offset, ((TabletsPayload) payload).getNextOffset())) {
final String errorMessage =
String.format(
"inconsistent offset, current: %s, incoming: %s, consumer: %s, prefetching queue: %s",
((TabletsPayload) payload).getNextOffset(), offset, consumerId, this);
LOGGER.warn(errorMessage);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
return ev;
}
return ev;
});

if (Objects.isNull(event)) {
final String errorMessage =
String.format(
"SubscriptionPrefetchingTabletQueue %s is currently not transferring any tablet to consumer %s, commit context: %s, offset: %s",
this, consumerId, commitContext, offset);
LOGGER.warn(errorMessage);
return generateSubscriptionPollErrorResponse(errorMessage);
}

// check consumer id
if (!Objects.equals(event.getLastPolledConsumerId(), consumerId)) {
final String errorMessage =
String.format(
"inconsistent polled consumer id, current: %s, incoming: %s, commit context: %s, offset: %s, prefetching queue: %s",
event.getLastPolledConsumerId(), consumerId, commitContext, offset, this);
LOGGER.warn(errorMessage);
return generateSubscriptionPollErrorResponse(errorMessage);
}
break;
default:
{
final String errorMessage =
String.format("unexpected response type: %s", responseType);
LOGGER.warn(errorMessage);
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
return ev;
}
}

final SubscriptionPollResponse response = event.getCurrentResponse();
final SubscriptionPollPayload payload = response.getPayload();
// 3. Poll next tablets
try {
ev.fetchNextResponse();
} catch (final Exception ignored) {
// no exceptions will be thrown
}

// 2. Check previous response type and offset
final short responseType = response.getResponseType();
if (!SubscriptionPollResponseType.isValidatedResponseType(responseType)) {
final String errorMessage = String.format("unexpected response type: %s", responseType);
LOGGER.warn(errorMessage);
return generateSubscriptionPollErrorResponse(errorMessage);
}
ev.recordLastPolledTimestamp();
eventRef.set(ev);

switch (SubscriptionPollResponseType.valueOf(responseType)) {
case TABLETS:
// check offset
if (!Objects.equals(offset, ((TabletsPayload) payload).getNextOffset())) {
final String errorMessage =
String.format(
"inconsistent offset, current: %s, incoming: %s, consumer: %s, prefetching queue: %s",
((TabletsPayload) payload).getNextOffset(), offset, consumerId, this);
LOGGER.warn(errorMessage);
return generateSubscriptionPollErrorResponse(errorMessage);
}
break;
default:
{
final String errorMessage = String.format("unexpected response type: %s", responseType);
LOGGER.warn(errorMessage);
return generateSubscriptionPollErrorResponse(errorMessage);
}
}

// 3. Poll next tablets
try {
event.fetchNextResponse();
} catch (final Exception ignored) {
// no exceptions will be thrown
}
return ev;
});

event.recordLastPolledTimestamp();
return event;
return eventRef.get();
}

/////////////////////////////// prefetch ///////////////////////////////
Expand Down
Loading