diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java index ec09b546648b4..9ea34939864c6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java @@ -45,7 +45,6 @@ import java.io.File; import java.io.IOException; -import java.net.Socket; import java.util.HashMap; import java.util.Objects; @@ -56,9 +55,8 @@ public class IoTDBConfigRegionAirGapConnector extends IoTDBAirGapConnector { @Override protected byte[] generateHandShakeV1Payload() throws IOException { - return compressIfNeeded( - PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); + return PipeTransferConfigNodeHandshakeV1Req.toTPipeTransferBytes( + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); } @Override @@ -71,7 +69,7 @@ protected byte[] generateHandShakeV2Payload() throws IOException { PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); - return compressIfNeeded(PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params)); + return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params); } @Override @@ -107,7 +105,7 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc @Override public void transfer(final Event event) throws Exception { final int socketIndex = nextSocketIndex(); - final Socket socket = sockets.get(socketIndex); + final AirGapSocket socket = sockets.get(socketIndex); try { if (event instanceof PipeConfigRegionWritePlanEvent) { @@ -131,7 +129,8 @@ public void transfer(final Event event) throws Exception { } private void doTransferWrapper( - final Socket socket, final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) + final AirGapSocket socket, + final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) throws PipeException, IOException { try { // We increase the reference count for this event to determine if the event may be released. @@ -147,13 +146,14 @@ private void doTransferWrapper( } private void doTransfer( - final Socket socket, final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) + final AirGapSocket socket, + final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent) throws PipeException, IOException { if (!send( + pipeConfigRegionWritePlanEvent.getPipeName(), socket, - compressIfNeeded( - PipeTransferConfigPlanReq.toTPipeTransferBytes( - pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())))) { + PipeTransferConfigPlanReq.toTPipeTransferBytes( + pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) { final String errorMessage = String.format( "Transfer config region write plan %s error. Socket: %s.", @@ -170,7 +170,7 @@ private void doTransfer( } private void doTransferWrapper( - final Socket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) + final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) throws PipeException, IOException { try { // We increase the reference count for this event to determine if the event may be released. @@ -186,29 +186,30 @@ private void doTransferWrapper( } private void doTransfer( - final Socket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) + final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent) throws PipeException, IOException { + final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName(); final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile(); final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile(); // 1. Transfer snapshotFile, and template file if exists - transferFilePieces(snapshot, socket, true); + transferFilePieces(pipeName, snapshot, socket, true); if (Objects.nonNull(templateFile)) { - transferFilePieces(templateFile, socket, true); + transferFilePieces(pipeName, templateFile, socket, true); } // 2. Transfer file seal signal, which means the snapshots are transferred completely if (!send( + pipeName, socket, - compressIfNeeded( - PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes( - // The pattern is surely Non-null - pipeConfigRegionSnapshotEvent.getPatternString(), - snapshot.getName(), - snapshot.length(), - Objects.nonNull(templateFile) ? templateFile.getName() : null, - Objects.nonNull(templateFile) ? templateFile.length() : 0, - pipeConfigRegionSnapshotEvent.getFileType(), - pipeConfigRegionSnapshotEvent.toSealTypeString())))) { + PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes( + // The pattern is surely Non-null + pipeConfigRegionSnapshotEvent.getPatternString(), + snapshot.getName(), + snapshot.length(), + Objects.nonNull(templateFile) ? templateFile.getName() : null, + Objects.nonNull(templateFile) ? templateFile.length() : 0, + pipeConfigRegionSnapshotEvent.getFileType(), + pipeConfigRegionSnapshotEvent.toSealTypeString()))) { final String errorMessage = String.format("Seal config region snapshot %s error. Socket %s.", snapshot, socket); // Send handshake because we don't know whether the receiver side configNode diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java index b199df4c390cc..00279bf22d9f6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java @@ -38,6 +38,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.tsfile.utils.Pair; @@ -123,13 +124,15 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri final TPipeTransferResp resp; try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferConfigPlanReq.toTPipeTransferReq( - pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))); + final TPipeTransferReq req = + compressIfNeeded( + PipeTransferConfigPlanReq.toTPipeTransferReq( + pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan())); + rateLimitIfNeeded( + pipeConfigRegionWritePlanEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( @@ -177,32 +180,35 @@ private void doTransferWrapper(final PipeConfigRegionSnapshotEvent pipeConfigReg private void doTransfer(final PipeConfigRegionSnapshotEvent snapshotEvent) throws PipeException, IOException { + final String pipeName = snapshotEvent.getPipeName(); final File snapshotFile = snapshotEvent.getSnapshotFile(); final File templateFile = snapshotEvent.getTemplateFile(); final Pair clientAndStatus = clientManager.getClient(); // 1. Transfer snapshotFile, and template File if exists - transferFilePieces(snapshotFile, clientAndStatus, true); + transferFilePieces(pipeName, snapshotFile, clientAndStatus, true); if (Objects.nonNull(templateFile)) { - transferFilePieces(templateFile, clientAndStatus, true); + transferFilePieces(pipeName, templateFile, clientAndStatus, true); } // 2. Transfer file seal signal, which means the snapshots are transferred completely final TPipeTransferResp resp; try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferConfigSnapshotSealReq.toTPipeTransferReq( - // The pattern is surely Non-null - snapshotEvent.getPatternString(), - snapshotFile.getName(), - snapshotFile.length(), - Objects.nonNull(templateFile) ? templateFile.getName() : null, - Objects.nonNull(templateFile) ? templateFile.length() : 0, - snapshotEvent.getFileType(), - snapshotEvent.toSealTypeString()))); + final TPipeTransferReq req = + compressIfNeeded( + PipeTransferConfigSnapshotSealReq.toTPipeTransferReq( + // The pattern is surely Non-null + snapshotEvent.getPatternString(), + snapshotFile.getName(), + snapshotFile.length(), + Objects.nonNull(templateFile) ? templateFile.getName() : null, + Objects.nonNull(templateFile) ? templateFile.length() : 0, + snapshotEvent.getFileType(), + snapshotEvent.toSealTypeString())); + rateLimitIfNeeded( + snapshotEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 5e481330d0909..2f4714d0cc50c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1107,7 +1107,7 @@ public class IoTDBConfig { private long loadCleanupTaskExecutionDelayTimeSeconds = 1800L; // 30 min - private double loadWriteThroughputBytesPerSecond = Double.MAX_VALUE; // Bytes/s + private double loadWriteThroughputBytesPerSecond = -1; // Bytes/s /** Pipe related */ /** initialized as empty, updated based on the latest `systemDir` during querying */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 82e2e09fbfa02..6ac64b6c48298 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -922,7 +922,7 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO conf.setLoadWriteThroughputBytesPerSecond( Double.parseDouble( properties.getProperty( - "load_write_throughput_bytes_per_sec", + "load_write_throughput_bytes_per_second", String.valueOf(conf.getLoadWriteThroughputBytesPerSecond())))); conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim()); @@ -1718,9 +1718,19 @@ public void loadHotModifiedProps(Properties properties) throws QueryProcessExcep conf.setLoadWriteThroughputBytesPerSecond( Double.parseDouble( properties.getProperty( - "load_write_throughput_bytes_per_sec", + "load_write_throughput_bytes_per_second", String.valueOf(conf.getLoadWriteThroughputBytesPerSecond())))); + // update pipe config + commonDescriptor + .getConfig() + .setPipeAllSinksRateLimitBytesPerSecond( + Double.parseDouble( + properties.getProperty( + "pipe_all_sinks_rate_limit_bytes_per_second", + String.valueOf( + commonDescriptor.getConfig().getPipeAllSinksRateLimitBytesPerSecond())))); + // update merge_threshold_of_explain_analyze conf.setMergeThresholdOfExplainAnalyze( Integer.parseInt( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java index 74b49ce7272b4..44d72b1e1c794 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeEventBatch.java @@ -39,7 +39,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; public class PipeEventBatch implements AutoCloseable { @@ -61,6 +63,9 @@ public class PipeEventBatch implements AutoCloseable { private final PipeMemoryBlock allocatedMemoryBlock; private long totalBufferSize = 0; + // Used to rate limit when transferring data + private final Map pipeName2BytesAccumulated = new HashMap<>(); + public PipeEventBatch(int maxDelayInMs, long requestMaxBatchSizeInBytes) { this.maxDelayInMs = maxDelayInMs; this.allocatedMemoryBlock = @@ -112,6 +117,10 @@ public synchronized boolean onEvent(final TabletInsertionEvent event) final int bufferSize = buildTabletInsertionBuffer(event); totalBufferSize += bufferSize; + pipeName2BytesAccumulated.compute( + ((EnrichedEvent) event).getPipeName(), + (pipeName, bytesAccumulated) -> + bytesAccumulated == null ? bufferSize : bytesAccumulated + bufferSize); if (firstEventProcessingTime == Long.MIN_VALUE) { firstEventProcessingTime = System.currentTimeMillis(); @@ -137,6 +146,7 @@ public synchronized void onSuccess() { firstEventProcessingTime = Long.MIN_VALUE; totalBufferSize = 0; + pipeName2BytesAccumulated.clear(); } public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException { @@ -160,6 +170,14 @@ public List deepCopyRequestCommitIds() { return new ArrayList<>(requestCommitIds); } + public Map deepCopyPipeName2BytesAccumulated() { + return new HashMap<>(pipeName2BytesAccumulated); + } + + public Map getPipeName2BytesAccumulated() { + return pipeName2BytesAccumulated; + } + private int buildTabletInsertionBuffer(final TabletInsertionEvent event) throws IOException, WALPipeException { final ByteBuffer buffer; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index 01c7de76da232..f6e80a2891760 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.Socket; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Set; @@ -88,9 +87,8 @@ protected boolean mayNeedHandshakeWhenFail() { @Override protected byte[] generateHandShakeV1Payload() throws IOException { - return compressIfNeeded( - PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); + return PipeTransferDataNodeHandshakeV1Req.toTPipeTransferBytes( + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); } @Override @@ -103,11 +101,12 @@ protected byte[] generateHandShakeV2Payload() throws IOException { PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); - return compressIfNeeded(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params)); + return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params); } protected void doTransferWrapper( - final Socket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) + final AirGapSocket socket, + final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException, IOException { try { // We increase the reference count for this event to determine if the event may be released. @@ -123,13 +122,14 @@ protected void doTransferWrapper( } private void doTransfer( - final Socket socket, final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) + final AirGapSocket socket, + final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent) throws PipeException, IOException { if (!send( + pipeSchemaRegionWritePlanEvent.getPipeName(), socket, - compressIfNeeded( - PipeTransferPlanNodeReq.toTPipeTransferBytes( - pipeSchemaRegionWritePlanEvent.getPlanNode())))) { + PipeTransferPlanNodeReq.toTPipeTransferBytes( + pipeSchemaRegionWritePlanEvent.getPlanNode()))) { final String errorMessage = String.format( "Transfer data node write plan %s error. Socket: %s.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java index 84ca0b0ccdb87..e83cd623fed64 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java @@ -47,7 +47,6 @@ import java.io.File; import java.io.IOException; -import java.net.Socket; import java.util.Objects; public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector { @@ -69,7 +68,7 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc } final int socketIndex = nextSocketIndex(); - final Socket socket = sockets.get(socketIndex); + final AirGapSocket socket = sockets.get(socketIndex); try { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { @@ -106,7 +105,7 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc } final int socketIndex = nextSocketIndex(); - final Socket socket = sockets.get(socketIndex); + final AirGapSocket socket = sockets.get(socketIndex); try { doTransferWrapper(socket, (PipeTsFileInsertionEvent) tsFileInsertionEvent); @@ -125,7 +124,7 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc @Override public void transfer(final Event event) throws Exception { final int socketIndex = nextSocketIndex(); - final Socket socket = sockets.get(socketIndex); + final AirGapSocket socket = sockets.get(socketIndex); try { if (event instanceof PipeSchemaRegionWritePlanEvent) { @@ -147,7 +146,7 @@ public void transfer(final Event event) throws Exception { } private void doTransferWrapper( - final Socket socket, + final AirGapSocket socket, final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException, IOException { try { @@ -164,19 +163,18 @@ private void doTransferWrapper( } private void doTransfer( - final Socket socket, + final AirGapSocket socket, final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, WALPipeException, IOException { final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible(); final byte[] bytes = - compressIfNeeded( - Objects.isNull(insertNode) - ? PipeTransferTabletBinaryReq.toTPipeTransferBytes( - pipeInsertNodeTabletInsertionEvent.getByteBuffer()) - : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode)); + Objects.isNull(insertNode) + ? PipeTransferTabletBinaryReq.toTPipeTransferBytes( + pipeInsertNodeTabletInsertionEvent.getByteBuffer()) + : PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode); - if (!send(socket, bytes)) { + if (!send(pipeInsertNodeTabletInsertionEvent.getPipeName(), socket, bytes)) { final String errorMessage = String.format( "Transfer PipeInsertNodeTabletInsertionEvent %s error. Socket: %s", @@ -190,7 +188,7 @@ private void doTransfer( } private void doTransferWrapper( - final Socket socket, final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) + final AirGapSocket socket, final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IOException { try { // We increase the reference count for this event to determine if the event may be released. @@ -206,14 +204,14 @@ private void doTransferWrapper( } private void doTransfer( - final Socket socket, final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) + final AirGapSocket socket, final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, IOException { if (!send( + pipeRawTabletInsertionEvent.getPipeName(), socket, - compressIfNeeded( - PipeTransferTabletRawReq.toTPipeTransferBytes( - pipeRawTabletInsertionEvent.convertToTablet(), - pipeRawTabletInsertionEvent.isAligned())))) { + PipeTransferTabletRawReq.toTPipeTransferBytes( + pipeRawTabletInsertionEvent.convertToTablet(), + pipeRawTabletInsertionEvent.isAligned()))) { final String errorMessage = String.format( "Transfer PipeRawTabletInsertionEvent %s error. Socket: %s.", @@ -227,7 +225,7 @@ private void doTransfer( } private void doTransferWrapper( - final Socket socket, final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) + final AirGapSocket socket, final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException { try { // We increase the reference count for this event to determine if the event may be released. @@ -243,22 +241,23 @@ private void doTransferWrapper( } private void doTransfer( - final Socket socket, final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) + final AirGapSocket socket, final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException { + final String pipeName = pipeTsFileInsertionEvent.getPipeName(); final File tsFile = pipeTsFileInsertionEvent.getTsFile(); final String errorMessage = String.format("Seal file %s error. Socket %s.", tsFile, socket); // 1. Transfer file piece by piece, and mod if needed if (pipeTsFileInsertionEvent.isWithMod() && supportModsIfIsDataNodeReceiver) { final File modFile = pipeTsFileInsertionEvent.getModFile(); - transferFilePieces(modFile, socket, true); - transferFilePieces(tsFile, socket, true); + transferFilePieces(pipeName, modFile, socket, true); + transferFilePieces(pipeName, tsFile, socket, true); // 2. Transfer file seal signal with mod, which means the file is transferred completely if (!send( + pipeName, socket, - compressIfNeeded( - PipeTransferTsFileSealWithModReq.toTPipeTransferBytes( - modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length())))) { + PipeTransferTsFileSealWithModReq.toTPipeTransferBytes( + modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()))) { receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), @@ -268,12 +267,12 @@ private void doTransfer( LOGGER.info("Successfully transferred file {}.", tsFile); } } else { - transferFilePieces(tsFile, socket, false); + transferFilePieces(pipeName, tsFile, socket, false); // 2. Transfer file seal signal without mod, which means the file is transferred completely if (!send( + pipeName, socket, - compressIfNeeded( - PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), tsFile.length())))) { + PipeTransferTsFileSealReq.toTPipeTransferBytes(tsFile.getName(), tsFile.length()))) { receiverStatusHandler.handle( new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) .setMessage(errorMessage), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java index f390994e4f0f1..a97408266b7a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java @@ -38,7 +38,6 @@ import java.io.File; import java.io.IOException; -import java.net.Socket; import java.util.Objects; public class IoTDBSchemaRegionAirGapConnector extends IoTDBDataNodeAirGapConnector { @@ -61,7 +60,7 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc @Override public void transfer(final Event event) throws Exception { final int socketIndex = nextSocketIndex(); - final Socket socket = sockets.get(socketIndex); + final AirGapSocket socket = sockets.get(socketIndex); try { if (event instanceof PipeSchemaRegionWritePlanEvent) { @@ -85,7 +84,7 @@ public void transfer(final Event event) throws Exception { } private void doTransferWrapper( - final Socket socket, final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) + final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) throws PipeException, IOException { try { // We increase the reference count for this event to determine if the event may be released. @@ -101,29 +100,30 @@ private void doTransferWrapper( } private void doTransfer( - final Socket socket, final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) + final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent pipeSchemaRegionSnapshotEvent) throws PipeException, IOException { + final String pipeName = pipeSchemaRegionSnapshotEvent.getPipeName(); final File mtreeSnapshotFile = pipeSchemaRegionSnapshotEvent.getMTreeSnapshotFile(); final File tagLogSnapshotFile = pipeSchemaRegionSnapshotEvent.getTagLogSnapshotFile(); // 1. Transfer mTreeSnapshotFile, and tLog file if exists - transferFilePieces(mtreeSnapshotFile, socket, true); + transferFilePieces(pipeName, mtreeSnapshotFile, socket, true); if (Objects.nonNull(tagLogSnapshotFile)) { - transferFilePieces(tagLogSnapshotFile, socket, true); + transferFilePieces(pipeName, tagLogSnapshotFile, socket, true); } // 2. Transfer file seal signal, which means the snapshots is transferred completely if (!send( + pipeName, socket, - compressIfNeeded( - PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes( - // The pattern is surely Non-null - pipeSchemaRegionSnapshotEvent.getPatternString(), - mtreeSnapshotFile.getName(), - mtreeSnapshotFile.length(), - Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName() : null, - Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length() : 0, - pipeSchemaRegionSnapshotEvent.getDatabaseName(), - pipeSchemaRegionSnapshotEvent.toSealTypeString())))) { + PipeTransferSchemaSnapshotSealReq.toTPipeTransferBytes( + // The pattern is surely Non-null + pipeSchemaRegionSnapshotEvent.getPatternString(), + mtreeSnapshotFile.getName(), + mtreeSnapshotFile.length(), + Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName() : null, + Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length() : 0, + pipeSchemaRegionSnapshotEvent.getDatabaseName(), + pipeSchemaRegionSnapshotEvent.toSealTypeString()))) { final String errorMessage = String.format( "Seal schema region snapshot file %s and %s error. Socket %s.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 632d929e3724a..3edcedc31d8b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -196,11 +196,8 @@ private void transferWithoutCheck(final TabletInsertionEvent tabletInsertionEven pipeInsertNodeTabletInsertionEvent, pipeTransferReq, this); transfer( - // insertNode.getDevicePath() is null for InsertRowsNode - Objects.nonNull(insertNode) && Objects.nonNull(insertNode.getDevicePath()) - ? insertNode.getDevicePath().getFullPath() - : null, - pipeTransferInsertNodeReqHandler); + // getDeviceId() may return null for InsertRowsNode + pipeInsertNodeTabletInsertionEvent.getDeviceId(), pipeTransferInsertNodeReqHandler); } else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent; @@ -491,7 +488,7 @@ public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { @Override // synchronized to avoid close connector when transfer event - public synchronized void close() throws Exception { + public synchronized void close() { isClosed.set(true); retryConnector.close(); @@ -500,6 +497,8 @@ public synchronized void close() throws Exception { if (tabletBatchBuilder != null) { tabletBatchBuilder.close(); } + + super.close(); } //////////////////////////// APIs provided for metric framework //////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java index 88e4134395077..7407b1b9554eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class PipeTransferTabletBatchEventHandler implements AsyncMethodCallback { @@ -50,7 +51,10 @@ public class PipeTransferTabletBatchEventHandler implements AsyncMethodCallback< private final List requestCommitIds; private final List events; + private final Map pipeName2BytesAccumulated; + private final TPipeTransferReq req; + private final double reqCompressionRatio; private final IoTDBDataRegionAsyncConnector connector; @@ -60,16 +64,25 @@ public PipeTransferTabletBatchEventHandler( // Deep copy to keep Ids' and events' reference requestCommitIds = batch.deepCopyRequestCommitIds(); events = batch.deepCopyEvents(); + pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated(); + + final TPipeTransferReq uncompressedReq = batch.toTPipeTransferReq(); req = connector.isRpcCompressionEnabled() - ? batch.toTPipeTransferReq() - : PipeTransferCompressedReq.toTPipeTransferReq( - batch.toTPipeTransferReq(), connector.getCompressors()); + ? PipeTransferCompressedReq.toTPipeTransferReq( + uncompressedReq, connector.getCompressors()) + : uncompressedReq; + reqCompressionRatio = (double) req.getBody().length / uncompressedReq.getBody().length; this.connector = connector; } public void transfer(final AsyncPipeDataTransferServiceClient client) throws TException { + for (final Map.Entry entry : pipeName2BytesAccumulated.entrySet()) { + connector.rateLimitIfNeeded( + entry.getKey(), client.getEndPoint(), (long) (entry.getValue() * reqCompressionRatio)); + } + client.pipeTransfer(req, this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java index 5d834440d7fe9..94fa6d5ac1a3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java @@ -53,6 +53,11 @@ protected PipeTransferTabletInsertionEventHandler( } public void transfer(AsyncPipeDataTransferServiceClient client) throws TException { + if (event instanceof EnrichedEvent) { + connector.rateLimitIfNeeded( + ((EnrichedEvent) event).getPipeName(), client.getEndPoint(), req.getBody().length); + } + doTransfer(client, req); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java index 8d02aeb8fa87a..e7e374443c64b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java @@ -122,15 +122,22 @@ public void transfer( transfer(clientManager, client); } else if (currentFile == tsFile) { isSealSignalSent.set(true); - client.pipeTransfer( - PipeTransferCompressedReq.toTPipeTransferReq( - transferMod - ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq( - modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()) - : PipeTransferTsFileSealReq.toTPipeTransferReq( - tsFile.getName(), tsFile.length()), - connector.getCompressors()), - this); + + final TPipeTransferReq uncompressedReq = + transferMod + ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq( + modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()) + : PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()); + final TPipeTransferReq req = + connector.isRpcCompressionEnabled() + ? PipeTransferCompressedReq.toTPipeTransferReq( + uncompressedReq, connector.getCompressors()) + : uncompressedReq; + + connector.rateLimitIfNeeded( + event.getPipeName(), client.getEndPoint(), req.getBody().length); + + client.pipeTransfer(req, this); } return; } @@ -145,12 +152,16 @@ public void transfer( currentFile.getName(), position, payload) : PipeTransferTsFilePieceReq.toTPipeTransferReq( currentFile.getName(), position, payload); - client.pipeTransfer( + final TPipeTransferReq req = connector.isRpcCompressionEnabled() ? PipeTransferCompressedReq.toTPipeTransferReq( uncompressedReq, connector.getCompressors()) - : uncompressedReq, - this); + : uncompressedReq; + + connector.rateLimitIfNeeded(event.getPipeName(), client.getEndPoint(), req.getBody().length); + + client.pipeTransfer(req, this); + position += readLength; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index 33c97ce27f291..aef655c324070 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -34,6 +34,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.tsfile.utils.Pair; @@ -113,13 +114,15 @@ protected void doTransfer(final PipeSchemaRegionWritePlanEvent pipeSchemaRegionW final TPipeTransferResp resp; try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferPlanNodeReq.toTPipeTransferReq( - pipeSchemaRegionWritePlanEvent.getPlanNode()))); + final TPipeTransferReq req = + compressIfNeeded( + PipeTransferPlanNodeReq.toTPipeTransferReq( + pipeSchemaRegionWritePlanEvent.getPlanNode())); + rateLimitIfNeeded( + pipeSchemaRegionWritePlanEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index 8ee65ef94edcd..cf93bac6ced8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -48,6 +48,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.tsfile.utils.Pair; @@ -56,6 +57,7 @@ import java.io.File; import java.io.IOException; +import java.util.Map; import java.util.Objects; public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { @@ -176,10 +178,23 @@ private void doTransfer(Pair endPointAndBatch) { final TPipeTransferResp resp; try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer(compressIfNeeded(batchToTransfer.toTPipeTransferReq())); + final TPipeTransferReq uncompressedReq = batchToTransfer.toTPipeTransferReq(); + final long uncompressedSize = uncompressedReq.getBody().length; + + final TPipeTransferReq req = compressIfNeeded(uncompressedReq); + final long compressedSize = req.getBody().length; + + final double compressionRatio = (double) compressedSize / uncompressedSize; + + for (final Map.Entry entry : + batchToTransfer.getPipeName2BytesAccumulated().entrySet()) { + rateLimitIfNeeded( + entry.getKey(), + clientAndStatus.getLeft().getEndPoint(), + (long) (entry.getValue() * compressionRatio)); + } + + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( @@ -230,30 +245,26 @@ private void doTransferWrapper( private void doTransfer( final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException { - final InsertNode insertNode; - Pair clientAndStatus = null; final TPipeTransferResp resp; + Pair clientAndStatus = null; try { - insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible(); // getDeviceId() may return null for InsertRowsNode, will be equal to getClient(null) clientAndStatus = clientManager.getClient(pipeInsertNodeTabletInsertionEvent.getDeviceId()); - if (insertNode != null) { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode))); - } else { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferTabletBinaryReq.toTPipeTransferReq( - pipeInsertNodeTabletInsertionEvent.getByteBuffer()))); - } + + final InsertNode insertNode = + pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible(); + final TPipeTransferReq req = + compressIfNeeded( + insertNode != null + ? PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode) + : PipeTransferTabletBinaryReq.toTPipeTransferReq( + pipeInsertNodeTabletInsertionEvent.getByteBuffer())); + rateLimitIfNeeded( + pipeInsertNodeTabletInsertionEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { if (clientAndStatus != null) { clientAndStatus.setRight(false); @@ -276,10 +287,11 @@ private void doTransfer( pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status), pipeInsertNodeTabletInsertionEvent.toString()); } - // insertNode.getDevicePath() is null for InsertRowsNode - if (insertNode != null && insertNode.getDevicePath() != null && status.isSetRedirectNode()) { + // pipeInsertNodeTabletInsertionEvent.getDeviceId() is null for InsertRowsNode + if (Objects.nonNull(pipeInsertNodeTabletInsertionEvent.getDeviceId()) + && status.isSetRedirectNode()) { clientManager.updateLeaderCache( - insertNode.getDevicePath().getFullPath(), status.getRedirectNode()); + pipeInsertNodeTabletInsertionEvent.getDeviceId(), status.getRedirectNode()); } } @@ -305,14 +317,16 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion final TPipeTransferResp resp; try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferTabletRawReq.toTPipeTransferReq( - pipeRawTabletInsertionEvent.convertToTablet(), - pipeRawTabletInsertionEvent.isAligned()))); + final TPipeTransferReq req = + compressIfNeeded( + PipeTransferTabletRawReq.toTPipeTransferReq( + pipeRawTabletInsertionEvent.convertToTablet(), + pipeRawTabletInsertionEvent.isAligned())); + rateLimitIfNeeded( + pipeRawTabletInsertionEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( @@ -356,6 +370,7 @@ private void doTransferWrapper(final PipeTsFileInsertionEvent pipeTsFileInsertio private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException { + final String pipeName = pipeTsFileInsertionEvent.getPipeName(); final File tsFile = pipeTsFileInsertionEvent.getTsFile(); final File modFile = pipeTsFileInsertionEvent.getModFile(); final Pair clientAndStatus = clientManager.getClient(); @@ -363,20 +378,19 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) // 1. Transfer tsFile, and mod file if exists and receiver's version >= 2 if (pipeTsFileInsertionEvent.isWithMod() && clientManager.supportModsIfIsDataNodeReceiver()) { - transferFilePieces(modFile, clientAndStatus, true); - transferFilePieces(tsFile, clientAndStatus, true); + transferFilePieces(pipeName, modFile, clientAndStatus, true); + transferFilePieces(pipeName, tsFile, clientAndStatus, true); // 2. Transfer file seal signal with mod, which means the file is transferred completely try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferTsFileSealWithModReq.toTPipeTransferReq( - modFile.getName(), - modFile.length(), - tsFile.getName(), - tsFile.length()))); + final TPipeTransferReq req = + compressIfNeeded( + PipeTransferTsFileSealWithModReq.toTPipeTransferReq( + modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length())); + rateLimitIfNeeded( + pipeTsFileInsertionEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); clientManager.adjustTimeoutIfNecessary(e); @@ -385,16 +399,17 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) e); } } else { - transferFilePieces(tsFile, clientAndStatus, false); + transferFilePieces(pipeName, tsFile, clientAndStatus, false); // 2. Transfer file seal signal without mod, which means the file is transferred completely try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferTsFileSealReq.toTPipeTransferReq( - tsFile.getName(), tsFile.length()))); + final TPipeTransferReq req = + compressIfNeeded( + PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length())); + rateLimitIfNeeded( + pipeTsFileInsertionEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); clientManager.adjustTimeoutIfNecessary(e); @@ -419,10 +434,10 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) @Override public void close() { - super.close(); - if (tabletBatchBuilder != null) { tabletBatchBuilder.close(); } + + super.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java index 2b2cba03f20db..210bbec3dc0d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java @@ -32,6 +32,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.tsfile.utils.Pair; @@ -87,32 +88,35 @@ private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent pipeSchemaReg private void doTransfer(final PipeSchemaRegionSnapshotEvent snapshotEvent) throws PipeException, IOException { + final String pipeName = snapshotEvent.getPipeName(); final File mTreeSnapshotFile = snapshotEvent.getMTreeSnapshotFile(); final File tagLogSnapshotFile = snapshotEvent.getTagLogSnapshotFile(); final Pair clientAndStatus = clientManager.getClient(); final TPipeTransferResp resp; // 1. Transfer mTreeSnapshotFile, and tLog file if exists - transferFilePieces(mTreeSnapshotFile, clientAndStatus, true); + transferFilePieces(pipeName, mTreeSnapshotFile, clientAndStatus, true); if (Objects.nonNull(tagLogSnapshotFile)) { - transferFilePieces(tagLogSnapshotFile, clientAndStatus, true); + transferFilePieces(pipeName, tagLogSnapshotFile, clientAndStatus, true); } // 2. Transfer file seal signal, which means the snapshots are transferred completely try { - resp = - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq( - // The pattern is surely Non-null - snapshotEvent.getPatternString(), - mTreeSnapshotFile.getName(), - mTreeSnapshotFile.length(), - Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName() : null, - Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length() : 0, - snapshotEvent.getDatabaseName(), - snapshotEvent.toSealTypeString()))); + final TPipeTransferReq req = + compressIfNeeded( + PipeTransferSchemaSnapshotSealReq.toTPipeTransferReq( + // The pattern is surely Non-null + snapshotEvent.getPatternString(), + mTreeSnapshotFile.getName(), + mTreeSnapshotFile.length(), + Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.getName() : null, + Objects.nonNull(tagLogSnapshotFile) ? tagLogSnapshotFile.length() : 0, + snapshotEvent.getDatabaseName(), + snapshotEvent.toSealTypeString())); + rateLimitIfNeeded( + snapshotEvent.getPipeName(), + clientAndStatus.getLeft().getEndPoint(), + req.getBody().length); + resp = clientAndStatus.getLeft().pipeTransfer(req); } catch (final Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java index 8aa7841a8d077..ce7046bf5ee54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java @@ -33,22 +33,19 @@ public class LoadTsFileRateLimiter { new AtomicDouble(CONFIG.getLoadWriteThroughputBytesPerSecond()); private final RateLimiter loadWriteRateLimiter; - private LoadTsFileRateLimiter() { - final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); - loadWriteRateLimiter = - // if throughput <= 0, disable rate limiting - throughputBytesPerSecondLimit <= 0 - ? RateLimiter.create(Double.MAX_VALUE) - : RateLimiter.create(throughputBytesPerSecondLimit); - } - public void acquire(long bytes) { - if (throughputBytesPerSecond.get() != CONFIG.getLoadWriteThroughputBytesPerSecond()) { - final double newThroughputBytesPerSecond = CONFIG.getLoadWriteThroughputBytesPerSecond(); - throughputBytesPerSecond.set(newThroughputBytesPerSecond); + final double throughputBytesPerSecondLimit = CONFIG.getLoadWriteThroughputBytesPerSecond(); + + if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { + throughputBytesPerSecond.set(throughputBytesPerSecondLimit); loadWriteRateLimiter.setRate( // if throughput <= 0, disable rate limiting - newThroughputBytesPerSecond <= 0 ? Double.MAX_VALUE : newThroughputBytesPerSecond); + throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : throughputBytesPerSecondLimit); + } + + // For performance, we don't need to acquire rate limiter if throughput <= 0 + if (throughputBytesPerSecondLimit <= 0) { + return; } while (bytes > 0) { @@ -64,6 +61,15 @@ public void acquire(long bytes) { //////////////////////////// Singleton //////////////////////////// + private LoadTsFileRateLimiter() { + final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); + loadWriteRateLimiter = + // if throughput <= 0, disable rate limiting + throughputBytesPerSecondLimit <= 0 + ? RateLimiter.create(Double.MAX_VALUE) + : RateLimiter.create(throughputBytesPerSecondLimit); + } + private static class LoadTsFileRateLimiterHolder { private static final LoadTsFileRateLimiter INSTANCE = new LoadTsFileRateLimiter(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index 4ea3048f2c389..843374ec49ccf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -46,16 +46,12 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; public class TreeModelPlanner implements IPlanner { - private static final Logger LOGGER = LoggerFactory.getLogger(TreeModelPlanner.class); private final Statement statement; diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 827eeca2ad7c2..099f76fb3b7db 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -965,6 +965,11 @@ data_replication_factor=1 # The port for the server to receive pipe data through air gap. # pipe_air_gap_receiver_port=9780 +# The total bytes that all pipe sinks can transfer per second. +# When given a value less than or equal to 0, it means no limit. +# default value is -1, which means no limit. +# pipe_all_sinks_rate_limit_bytes_per_second=-1 + #################### ### RatisConsensus Configuration #################### @@ -1104,3 +1109,8 @@ data_replication_factor=1 # Load clean up task is used to clean up the unsuccessful loaded tsfile after a certain period of time. # The parameter is the delay time after an unsuccessful load operation (in seconds). # load_clean_up_task_execution_delay_time_seconds=1800 + +# The maximum bytes per second of disk write throughput when loading tsfile. +# When given a value less than or equal to 0, it means no limit. +# Default value is -1, which means no limit. +# load_write_throughput_bytes_per_second=-1 diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 4572516fbee60..91fa97fc82c7e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -165,6 +165,10 @@ public int getPort() { return endpoint.getPort(); } + public TEndPoint getEndPoint() { + return endpoint; + } + @Override public String toString() { return String.format("AsyncPipeDataTransferServiceClient{%s}, id = {%d}", endpoint, id); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index fc437a3de9790..c03cf539711c1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -194,6 +194,8 @@ public class CommonConfig { private int pipeAsyncConnectorSelectorNumber = 4; private int pipeAsyncConnectorMaxClientNumber = 16; + private double pipeAllSinksRateLimitBytesPerSecond = -1; + private boolean isSeperatedPipeHeartbeatEnabled = true; private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100; private long pipeMetaSyncerInitialSyncDelayMinutes = 3; @@ -1007,6 +1009,14 @@ public void setPipeRemainingTimeCommitRateSmoothingFactor( this.pipeRemainingTimeCommitRateSmoothingFactor = pipeRemainingTimeCommitRateSmoothingFactor; } + public double getPipeAllSinksRateLimitBytesPerSecond() { + return pipeAllSinksRateLimitBytesPerSecond; + } + + public void setPipeAllSinksRateLimitBytesPerSecond(double pipeAllSinksRateLimitBytesPerSecond) { + this.pipeAllSinksRateLimitBytesPerSecond = pipeAllSinksRateLimitBytesPerSecond; + } + public long getTwoStageAggregateMaxCombinerLiveTimeInMs() { return twoStageAggregateMaxCombinerLiveTimeInMs; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index efb3420b62ca7..b472b155aefa8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -377,6 +377,12 @@ private void loadPipeProps(Properties properties) { "pipe_async_connector_max_client_number", String.valueOf(config.getPipeAsyncConnectorMaxClientNumber()))))); + config.setPipeAllSinksRateLimitBytesPerSecond( + Double.parseDouble( + properties.getProperty( + "pipe_all_sinks_rate_limit_bytes_per_second", + String.valueOf(config.getPipeAllSinksRateLimitBytesPerSecond())))); + config.setSeperatedPipeHeartbeatEnabled( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 41331a822ddac..dfd48e7eccb91 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -123,6 +123,10 @@ public int getPipeAsyncConnectorMaxClientNumber() { return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber(); } + public double getPipeAllConnectorsRateLimitBytesPerSecond() { + return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond(); + } + public float getPipeLeaderCacheMemoryUsagePercentage() { return COMMON_CONFIG.getPipeLeaderCacheMemoryUsagePercentage(); } @@ -324,6 +328,10 @@ public void printAllConfigs() { LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", getPipeAsyncConnectorSelectorNumber()); LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", getPipeAsyncConnectorMaxClientNumber()); + LOGGER.info( + "PipeAllConnectorsRateLimitBytesPerSecond: {}", + getPipeAllConnectorsRateLimitBytesPerSecond()); + LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", isSeperatedPipeHeartbeatEnabled()); LOGGER.info( "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}", diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index c15fc4db1890f..5e73c4d54d88b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -177,6 +177,10 @@ public class PipeConnectorConstant { CONNECTOR_COMPRESSOR_ZSTD, CONNECTOR_COMPRESSOR_LZMA2))); + public static final String CONNECTOR_RATE_LIMIT_KEY = "connector.rate-limit-bytes-per-second"; + public static final String SINK_RATE_LIMIT_KEY = "sink.rate-limit-bytes-per-second"; + public static final double CONNECTOR_RATE_LIMIT_DEFAULT_VALUE = -1; + public static final String SINK_TOPIC_KEY = "sink.topic"; public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java index 3ab7a156b74e6..f15934afd5ea2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClient.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.connector.client; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; @@ -33,6 +34,7 @@ public class IoTDBSyncClient extends IClientRPCService.Client private final String ipAddress; private final int port; + private final TEndPoint endPoint; public IoTDBSyncClient( ThriftClientProperty property, @@ -57,6 +59,7 @@ public IoTDBSyncClient( ipAddress, port, property.getConnectionTimeoutMs()))); this.ipAddress = ipAddress; this.port = port; + this.endPoint = new TEndPoint(ipAddress, port); final TTransport transport = getInputProtocol().getTransport(); if (!transport.isOpen()) { transport.open(); @@ -71,6 +74,10 @@ public int getPort() { return port; } + public TEndPoint getEndPoint() { + return endPoint; + } + public void setTimeout(int timeout) { ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java new file mode 100644 index 0000000000000..575731d04a5ec --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/GlobalRateLimiter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.connector.limiter; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; + +import com.google.common.util.concurrent.AtomicDouble; +import com.google.common.util.concurrent.RateLimiter; + +/** This is a global rate limiter for all connectors. */ +public class GlobalRateLimiter { + + private static final PipeConfig CONFIG = PipeConfig.getInstance(); + + private final AtomicDouble throughputBytesPerSecond = + new AtomicDouble(CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond()); + private final RateLimiter rateLimiter; + + public GlobalRateLimiter() { + final double throughputBytesPerSecondLimit = throughputBytesPerSecond.get(); + rateLimiter = + throughputBytesPerSecondLimit <= 0 + ? RateLimiter.create(Double.MAX_VALUE) + : RateLimiter.create(throughputBytesPerSecondLimit); + } + + public void acquire(long bytes) { + final double throughputBytesPerSecondLimit = + CONFIG.getPipeAllConnectorsRateLimitBytesPerSecond(); + + if (throughputBytesPerSecond.get() != throughputBytesPerSecondLimit) { + throughputBytesPerSecond.set(throughputBytesPerSecondLimit); + rateLimiter.setRate( + // if throughput <= 0, disable rate limiting + throughputBytesPerSecondLimit <= 0 ? Double.MAX_VALUE : throughputBytesPerSecondLimit); + } + + // For performance, we don't need to acquire rate limiter if throughput <= 0 + if (throughputBytesPerSecondLimit <= 0) { + return; + } + + while (bytes > 0) { + if (bytes > Integer.MAX_VALUE) { + rateLimiter.acquire(Integer.MAX_VALUE); + bytes -= Integer.MAX_VALUE; + } else { + rateLimiter.acquire((int) bytes); + return; + } + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java new file mode 100644 index 0000000000000..8b15164457667 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/limiter/PipeEndPointRateLimiter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.connector.limiter; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; + +import com.google.common.util.concurrent.RateLimiter; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class PipeEndPointRateLimiter { + + private final double bytesPerSecondLimit; + + private final ConcurrentMap endPointRateLimiterMap; + + public PipeEndPointRateLimiter(double bytesPerSecondLimit) { + this.bytesPerSecondLimit = bytesPerSecondLimit; + endPointRateLimiterMap = new ConcurrentHashMap<>(); + } + + public void acquire(final TEndPoint endPoint, long bytes) { + if (endPoint == null) { + return; + } + + final RateLimiter rateLimiter = + endPointRateLimiterMap.computeIfAbsent( + endPoint, e -> RateLimiter.create(bytesPerSecondLimit)); + + while (bytes > 0) { + if (bytes > Integer.MAX_VALUE) { + rateLimiter.acquire(Integer.MAX_VALUE); + bytes -= Integer.MAX_VALUE; + } else { + rateLimiter.acquire((int) bytes); + return; + } + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java index ec2d22ddfa646..7aac1ffc90df1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.pipe.connector.protocol; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapELanguageConstant; @@ -57,11 +58,29 @@ public abstract class IoTDBAirGapConnector extends IoTDBConnector { + protected static class AirGapSocket extends Socket { + + private final TEndPoint endPoint; + + public AirGapSocket(String ip, int port) { + this.endPoint = new TEndPoint(ip, port); + } + + public TEndPoint getEndPoint() { + return endPoint; + } + + @Override + public String toString() { + return "AirGapSocket{" + "endPoint=" + endPoint + "} (" + super.toString() + ")"; + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBAirGapConnector.class); protected static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); - protected final List sockets = new ArrayList<>(); + protected final List sockets = new ArrayList<>(); protected final List isSocketAlive = new ArrayList<>(); private LoadBalancer loadBalancer; @@ -147,7 +166,7 @@ public void handshake() throws Exception { } } - final Socket socket = new Socket(); + final AirGapSocket socket = new AirGapSocket(ip, port); try { socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs); @@ -176,7 +195,7 @@ public void handshake() throws Exception { String.format("All target servers %s are not available.", nodeUrls)); } - protected void sendHandshakeReq(Socket socket) throws IOException { + protected void sendHandshakeReq(AirGapSocket socket) throws IOException { socket.setSoTimeout(handshakeTimeoutMs); // Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to handshake by // PipeTransferHandshakeV1Req. If failed again, throw PipeConnectionException. @@ -208,7 +227,8 @@ public void heartbeat() { } } - protected void transferFilePieces(File file, Socket socket, boolean isMultiFile) + protected void transferFilePieces( + String pipeName, File file, AirGapSocket socket, boolean isMultiFile) throws PipeException, IOException { final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); final byte[] readBuffer = new byte[readFileBufferSize]; @@ -225,11 +245,11 @@ protected void transferFilePieces(File file, Socket socket, boolean isMultiFile) ? readBuffer : Arrays.copyOfRange(readBuffer, 0, readLength); if (!send( + pipeName, socket, - compressIfNeeded( - isMultiFile - ? getTransferMultiFilePieceBytes(file.getName(), position, payload) - : getTransferSingleFilePieceBytes(file.getName(), position, payload)))) { + isMultiFile + ? getTransferMultiFilePieceBytes(file.getName(), position, payload) + : getTransferSingleFilePieceBytes(file.getName(), position, payload))) { final String errorMessage = String.format("Transfer file %s error. Socket %s.", file, socket); if (mayNeedHandshakeWhenFail()) { @@ -261,11 +281,15 @@ protected int nextSocketIndex() { return loadBalancer.nextSocketIndex(); } - protected boolean send(Socket socket, byte[] bytes) throws IOException { + protected boolean send(String pipeName, AirGapSocket socket, byte[] bytes) throws IOException { if (!socket.isConnected()) { return false; } + bytes = compressIfNeeded(bytes); + + rateLimitIfNeeded(pipeName, socket.getEndPoint(), bytes.length); + final BufferedOutputStream outputStream = new BufferedOutputStream(socket.getOutputStream()); bytes = enrichWithLengthAndChecksum(bytes); outputStream.write(eLanguageEnable ? enrichWithELanguage(bytes) : bytes); @@ -276,6 +300,10 @@ protected boolean send(Socket socket, byte[] bytes) throws IOException { return size > 0 && Arrays.equals(AirGapOneByteResponse.OK, response); } + protected boolean send(AirGapSocket socket, byte[] bytes) throws IOException { + return send(null, socket, bytes); + } + private byte[] enrichWithLengthAndChecksum(byte[] bytes) { // Length of checksum and bytes payload final byte[] length = BytesUtils.intToBytes(bytes.length + LONG_LEN); @@ -309,6 +337,8 @@ public void close() { isSocketAlive.set(i, false); } } + + super.close(); } /////////////////////// Strategies for load balance ////////////////////////// diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 751f0dec05f52..f65daf86d4ff8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -23,6 +23,8 @@ import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler; import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor; import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory; +import org.apache.iotdb.commons.pipe.connector.limiter.GlobalRateLimiter; +import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter; import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq; import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.pipe.api.PipeConnector; @@ -40,8 +42,10 @@ import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_COMPRESSOR_KEY; @@ -65,6 +69,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_BALANCE_STRATEGY_SET; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_COMPRESSOR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY; @@ -77,6 +83,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_NODE_URLS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY; public abstract class IoTDBConnector implements PipeConnector { @@ -91,8 +98,13 @@ public abstract class IoTDBConnector implements PipeConnector { protected String loadBalanceStrategy; - protected boolean isRpcCompressionEnabled; - protected final List compressors = new ArrayList<>(); + private boolean isRpcCompressionEnabled; + private final List compressors = new ArrayList<>(); + + private static final Map PIPE_END_POINT_RATE_LIMITER_MAP = + new ConcurrentHashMap<>(); + private double endPointRateLimitBytesPerSecond = -1; + private static final GlobalRateLimiter GLOBAL_RATE_LIMITER = new GlobalRateLimiter(); protected boolean isTabletBatchModeEnabled = true; @@ -169,6 +181,17 @@ public void validate(PipeParameterValidator validator) throws Exception { compressors.size()); isRpcCompressionEnabled = !compressors.isEmpty(); + endPointRateLimitBytesPerSecond = + parameters.getDoubleOrDefault( + Arrays.asList(CONNECTOR_RATE_LIMIT_KEY, SINK_RATE_LIMIT_KEY), + CONNECTOR_RATE_LIMIT_DEFAULT_VALUE); + validator.validate( + arg -> endPointRateLimitBytesPerSecond <= Double.MAX_VALUE, + String.format( + "Rate limit should be in the range (0, %f], but got %f.", + Double.MAX_VALUE, endPointRateLimitBytesPerSecond), + endPointRateLimitBytesPerSecond); + validator.validate( arg -> arg.equals("retry") || arg.equals("ignore"), String.format( @@ -302,6 +325,12 @@ private void checkNodeUrls(Set nodeUrls) throws PipeParameterNotValid } } + @Override + public void close() { + // TODO: Not all the limiters should be closed here, but it's fine for now. + PIPE_END_POINT_RATE_LIMITER_MAP.clear(); + } + protected TPipeTransferReq compressIfNeeded(TPipeTransferReq req) throws IOException { return isRpcCompressionEnabled ? PipeTransferCompressedReq.toTPipeTransferReq(req, compressors) @@ -322,6 +351,18 @@ public List getCompressors() { return compressors; } + public void rateLimitIfNeeded( + final String pipeName, final TEndPoint endPoint, final long bytesLength) { + if (pipeName != null && endPointRateLimitBytesPerSecond > 0) { + PIPE_END_POINT_RATE_LIMITER_MAP + .computeIfAbsent( + pipeName, endpoint -> new PipeEndPointRateLimiter(endPointRateLimitBytesPerSecond)) + .acquire(endPoint, bytesLength); + } + + GLOBAL_RATE_LIMITER.acquire(bytesLength); + } + public PipeReceiverStatusHandler statusHandler() { return receiverStatusHandler; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java index 4e1ab4d478915..713f25df29ef0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java @@ -32,6 +32,7 @@ import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import com.google.common.collect.ImmutableList; import org.apache.tsfile.utils.Pair; @@ -143,7 +144,10 @@ public void heartbeat() { } protected void transferFilePieces( - File file, Pair clientAndStatus, boolean isMultiFile) + String pipeName, + File file, + Pair clientAndStatus, + boolean isMultiFile) throws PipeException, IOException { final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); final byte[] readBuffer = new byte[readFileBufferSize]; @@ -161,16 +165,16 @@ protected void transferFilePieces( : Arrays.copyOfRange(readBuffer, 0, readLength); final PipeTransferFilePieceResp resp; try { + final TPipeTransferReq req = + compressIfNeeded( + isMultiFile + ? getTransferMultiFilePieceReq(file.getName(), position, payLoad) + : getTransferSingleFilePieceReq(file.getName(), position, payLoad)); + rateLimitIfNeeded( + pipeName, clientAndStatus.getLeft().getEndPoint(), req.getBody().length); resp = PipeTransferFilePieceResp.fromTPipeTransferResp( - clientAndStatus - .getLeft() - .pipeTransfer( - compressIfNeeded( - isMultiFile - ? getTransferMultiFilePieceReq(file.getName(), position, payLoad) - : getTransferSingleFilePieceReq( - file.getName(), position, payLoad)))); + clientAndStatus.getLeft().pipeTransfer(req)); } catch (Exception e) { clientAndStatus.setRight(false); throw new PipeConnectionException( @@ -219,5 +223,7 @@ public void close() { if (clientManager != null) { clientManager.close(); } + + super.close(); } }