Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pipe-it-2cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:
java: [ 17 ]
# StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet.
cluster1: [ LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode ]
cluster2: [ LightWeightStandaloneMode ]
cluster2: [ LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode ]
os: [ ubuntu-latest ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.PublicBAOS;

import java.io.DataOutputStream;
Expand All @@ -32,6 +33,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class PipeSubscribePollResp extends TPipeSubscribeResp {

Expand All @@ -48,16 +50,17 @@ public List<EnrichedTablets> getEnrichedTabletsList() {
* server.
*/
public static PipeSubscribePollResp toTPipeSubscribeResp(
TSStatus status, List<EnrichedTablets> enrichedTabletsList) {
TSStatus status, List<Pair<ByteBuffer, EnrichedTablets>> enrichedTabletsWithByteBufferList) {
final PipeSubscribePollResp resp = new PipeSubscribePollResp();

resp.enrichedTabletsList = enrichedTabletsList;
resp.enrichedTabletsList =
enrichedTabletsWithByteBufferList.stream().map(Pair::getRight).collect(Collectors.toList());

resp.status = status;
resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion();
resp.type = PipeSubscribeResponseType.POLL_TABLETS.getType();
try {
resp.body = serializeEnrichedTabletsList(enrichedTabletsList);
resp.body = serializeEnrichedTabletsWithByteBufferList(enrichedTabletsWithByteBufferList);
} catch (IOException e) {
resp.status = RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, e.getMessage());
}
Expand Down Expand Up @@ -108,6 +111,21 @@ public static List<ByteBuffer> serializeEnrichedTabletsList(
return byteBufferList;
}

public static List<ByteBuffer> serializeEnrichedTabletsWithByteBufferList(
List<Pair<ByteBuffer, EnrichedTablets>> enrichedTabletsWithByteBufferList)
throws IOException {
List<ByteBuffer> byteBufferList = new ArrayList<>();
for (Pair<ByteBuffer, EnrichedTablets> enrichedTabletsWithByteBuffer :
enrichedTabletsWithByteBufferList) {
if (Objects.nonNull(enrichedTabletsWithByteBuffer.getLeft())) {
byteBufferList.add(enrichedTabletsWithByteBuffer.getLeft());
} else {
byteBufferList.add(serializeEnrichedTablets(enrichedTabletsWithByteBuffer.getRight()));
}
}
return byteBufferList;
}

public static ByteBuffer serializeEnrichedTablets(EnrichedTablets enrichedTablets)
throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public SerializedEnrichedEvent(
this.committedTimestamp = INVALID_TIMESTAMP;
}

//////////////////////////// serialization ////////////////////////////

public EnrichedTablets getEnrichedTablets() {
return enrichedTablets;
}

/** @return true -> byte buffer is not null */
public boolean serialize() {
if (Objects.isNull(byteBuffer)) {
Expand All @@ -80,6 +86,8 @@ public void resetByteBuffer() {
byteBuffer = null;
}

//////////////////////////// commit ////////////////////////////

public String getSubscriptionCommitId() {
return enrichedTablets.getSubscriptionCommitId();
}
Expand All @@ -90,10 +98,6 @@ public void decreaseReferenceCount() {
}
}

public void recordLastPolledTimestamp() {
lastPolledTimestamp = Math.max(lastPolledTimestamp, System.currentTimeMillis());
}

public void recordCommittedTimestamp() {
committedTimestamp = System.currentTimeMillis();
}
Expand All @@ -102,6 +106,12 @@ public boolean isCommitted() {
return committedTimestamp != INVALID_TIMESTAMP;
}

//////////////////////////// pollable ////////////////////////////

public void recordLastPolledTimestamp() {
lastPolledTimestamp = Math.max(lastPolledTimestamp, System.currentTimeMillis());
}

public boolean pollable() {
if (lastPolledTimestamp == INVALID_TIMESTAMP) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq;
Expand All @@ -59,6 +60,7 @@
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.tsfile.utils.Pair;

import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -325,14 +327,6 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(PipeSubscribePollReq
SubscriptionConfig.getInstance().getSubscriptionMinPollTimeoutMs()));
List<SerializedEnrichedEvent> events =
SubscriptionAgent.broker().poll(consumerConfig, topicNames, timer);

// serialize events and filter
events =
events.stream()
.peek((SerializedEnrichedEvent::serialize))
.filter((event -> Objects.nonNull(event.getByteBuffer())))
.collect(Collectors.toList());

List<String> subscriptionCommitIds =
events.stream()
.map(SerializedEnrichedEvent::getSubscriptionCommitId)
Expand All @@ -344,18 +338,21 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(PipeSubscribePollReq
consumerConfig,
topicNames);
}

LOGGER.info(
"Subscription: consumer {} poll topics {} successfully, commit ids: {}",
consumerConfig,
topicNames,
subscriptionCommitIds);

// fetch and reset byte buffer
List<ByteBuffer> byteBuffers =
events.stream().map(SerializedEnrichedEvent::getByteBuffer).collect(Collectors.toList());
List<Pair<ByteBuffer, EnrichedTablets>> enrichedTabletsWithByteBufferList =
events.stream()
.map(event -> new Pair<>(event.getByteBuffer(), event.getEnrichedTablets()))
.collect(Collectors.toList());
TPipeSubscribeResp resp =
PipeSubscribePollResp.toTPipeSubscribeResp(
RpcUtils.SUCCESS_STATUS, enrichedTabletsWithByteBufferList);
events.forEach(SerializedEnrichedEvent::resetByteBuffer);
return PipeSubscribePollResp.directToTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, byteBuffers);
return resp;
}

private TPipeSubscribeResp handlePipeSubscribeCommit(PipeSubscribeCommitReq req) {
Expand Down