From a385d2d4750f7c9f0ced87c1a7c8ffb936127016 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 1 Apr 2024 14:08:44 +0800 Subject: [PATCH] try fix --- .github/workflows/pipe-it-2cluster.yml | 2 +- .../response/PipeSubscribePollResp.java | 24 ++++++++++++++++--- .../broker/SerializedEnrichedEvent.java | 18 ++++++++++---- .../receiver/SubscriptionReceiverV1.java | 23 ++++++++---------- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml index 040154f8cb908..c39bfae159b3a 100644 --- a/.github/workflows/pipe-it-2cluster.yml +++ b/.github/workflows/pipe-it-2cluster.yml @@ -109,7 +109,7 @@ jobs: java: [ 17 ] # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. cluster1: [ LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode ] - cluster2: [ LightWeightStandaloneMode ] + cluster2: [ LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} steps: diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java index 2616a8316fdd1..0d1ed4eb0e2ff 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribePollResp.java @@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets; import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.PublicBAOS; import java.io.DataOutputStream; @@ -32,6 +33,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; public class PipeSubscribePollResp extends TPipeSubscribeResp { @@ -48,16 +50,17 @@ public List getEnrichedTabletsList() { * server. */ public static PipeSubscribePollResp toTPipeSubscribeResp( - TSStatus status, List enrichedTabletsList) { + TSStatus status, List> enrichedTabletsWithByteBufferList) { final PipeSubscribePollResp resp = new PipeSubscribePollResp(); - resp.enrichedTabletsList = enrichedTabletsList; + resp.enrichedTabletsList = + enrichedTabletsWithByteBufferList.stream().map(Pair::getRight).collect(Collectors.toList()); resp.status = status; resp.version = PipeSubscribeResponseVersion.VERSION_1.getVersion(); resp.type = PipeSubscribeResponseType.POLL_TABLETS.getType(); try { - resp.body = serializeEnrichedTabletsList(enrichedTabletsList); + resp.body = serializeEnrichedTabletsWithByteBufferList(enrichedTabletsWithByteBufferList); } catch (IOException e) { resp.status = RpcUtils.getStatus(TSStatusCode.SUBSCRIPTION_POLL_ERROR, e.getMessage()); } @@ -108,6 +111,21 @@ public static List serializeEnrichedTabletsList( return byteBufferList; } + public static List serializeEnrichedTabletsWithByteBufferList( + List> enrichedTabletsWithByteBufferList) + throws IOException { + List byteBufferList = new ArrayList<>(); + for (Pair enrichedTabletsWithByteBuffer : + enrichedTabletsWithByteBufferList) { + if (Objects.nonNull(enrichedTabletsWithByteBuffer.getLeft())) { + byteBufferList.add(enrichedTabletsWithByteBuffer.getLeft()); + } else { + byteBufferList.add(serializeEnrichedTablets(enrichedTabletsWithByteBuffer.getRight())); + } + } + return byteBufferList; + } + public static ByteBuffer serializeEnrichedTablets(EnrichedTablets enrichedTablets) throws IOException { try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java index 4b3808f1a0937..419579713fd46 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java @@ -54,6 +54,12 @@ public SerializedEnrichedEvent( this.committedTimestamp = INVALID_TIMESTAMP; } + //////////////////////////// serialization //////////////////////////// + + public EnrichedTablets getEnrichedTablets() { + return enrichedTablets; + } + /** @return true -> byte buffer is not null */ public boolean serialize() { if (Objects.isNull(byteBuffer)) { @@ -80,6 +86,8 @@ public void resetByteBuffer() { byteBuffer = null; } + //////////////////////////// commit //////////////////////////// + public String getSubscriptionCommitId() { return enrichedTablets.getSubscriptionCommitId(); } @@ -90,10 +98,6 @@ public void decreaseReferenceCount() { } } - public void recordLastPolledTimestamp() { - lastPolledTimestamp = Math.max(lastPolledTimestamp, System.currentTimeMillis()); - } - public void recordCommittedTimestamp() { committedTimestamp = System.currentTimeMillis(); } @@ -102,6 +106,12 @@ public boolean isCommitted() { return committedTimestamp != INVALID_TIMESTAMP; } + //////////////////////////// pollable //////////////////////////// + + public void recordLastPolledTimestamp() { + lastPolledTimestamp = Math.max(lastPolledTimestamp, System.currentTimeMillis()); + } + public boolean pollable() { if (lastPolledTimestamp == INVALID_TIMESTAMP) { return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 22a3b8ee1a2dd..d0e35211816a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -39,6 +39,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; +import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets; import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq; import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq; import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeHandshakeReq; @@ -59,6 +60,7 @@ import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeUnsubscribeResp; import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq; import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -325,14 +327,6 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(PipeSubscribePollReq SubscriptionConfig.getInstance().getSubscriptionMinPollTimeoutMs())); List events = SubscriptionAgent.broker().poll(consumerConfig, topicNames, timer); - - // serialize events and filter - events = - events.stream() - .peek((SerializedEnrichedEvent::serialize)) - .filter((event -> Objects.nonNull(event.getByteBuffer()))) - .collect(Collectors.toList()); - List subscriptionCommitIds = events.stream() .map(SerializedEnrichedEvent::getSubscriptionCommitId) @@ -344,18 +338,21 @@ private TPipeSubscribeResp handlePipeSubscribePollInternal(PipeSubscribePollReq consumerConfig, topicNames); } - LOGGER.info( "Subscription: consumer {} poll topics {} successfully, commit ids: {}", consumerConfig, topicNames, subscriptionCommitIds); - // fetch and reset byte buffer - List byteBuffers = - events.stream().map(SerializedEnrichedEvent::getByteBuffer).collect(Collectors.toList()); + List> enrichedTabletsWithByteBufferList = + events.stream() + .map(event -> new Pair<>(event.getByteBuffer(), event.getEnrichedTablets())) + .collect(Collectors.toList()); + TPipeSubscribeResp resp = + PipeSubscribePollResp.toTPipeSubscribeResp( + RpcUtils.SUCCESS_STATUS, enrichedTabletsWithByteBufferList); events.forEach(SerializedEnrichedEvent::resetByteBuffer); - return PipeSubscribePollResp.directToTPipeSubscribeResp(RpcUtils.SUCCESS_STATUS, byteBuffers); + return resp; } private TPipeSubscribeResp handlePipeSubscribeCommit(PipeSubscribeCommitReq req) {