Skip to content
Closed

Test PR #16779

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d32635d
Added RPC columnEncoder(Rle,Plain) and MetaHead.java
YangYumings Apr 29, 2025
ede1a89
Improve RpcEncoder logic.
YangYumings May 7, 2025
aac22c5
decoder
YangYumings May 9, 2025
af5879b
RPC compressed: The column encoding and decoding of the Plain method …
YangYumings May 13, 2025
f1797d2
Implement the initial full-data compression logic for RPC.
YangYumings May 19, 2025
732d0fb
Refine RPC compression logic
YangYumings May 19, 2025
0179ed6
Fix the BufferUnderflowException in ByteBuffer usage.
YangYumings May 20, 2025
791fd2c
add Corilla columnar encoding/decoding logic
YangYumings May 27, 2025
91748e9
add chimp columnar encoding/decoding logic
YangYumings May 27, 2025
cc1eae6
add Zigzag columnar encoding/decoding logic
YangYumings May 27, 2025
6ed1cdb
add Sprintz columnar encoding/decoding logic
YangYumings May 27, 2025
6fd8566
add RLBE columnar encoding/decoding logic
YangYumings May 27, 2025
2f9fa1f
add Dictionary columnar encoding/decoding logic
YangYumings May 27, 2025
f9847ac
add TS_2DIFF columnar encoding/decoding logic
YangYumings May 27, 2025
0f51c31
add RPC columnar decompression and decoding for monitoring data
YangYumings May 27, 2025
cb2d7a2
Refactor the encoding and compression structure.
YangYumings Jun 3, 2025
51f68be
Add RPC test cases
YangYumings Jun 3, 2025
b081963
Fix code structure
YangYumings Jun 3, 2025
bb3dc16
Fix the metric collection issue for RPC compression.
YangYumings Jun 11, 2025
45f3554
remove gen-java code.
YangYumings Jun 12, 2025
e59dd30
refactor compression
jt2594838 Jul 1, 2025
052a37f
Merge branch 'master' into yym_rpc_compress
jt2594838 Jul 2, 2025
b87fd7c
add test
jt2594838 Jul 3, 2025
cb602ed
revert changes regarding compact protocol
jt2594838 Jul 14, 2025
be542c9
refactor configurations
jt2594838 Jul 15, 2025
e3e9b69
fix compression
jt2594838 Jul 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public ITableSessionPool getTableSessionPool(int maxSize) {
.maxSize(maxSize)
.fetchSize(SessionConfig.DEFAULT_FETCH_SIZE)
.waitToGetSessionTimeoutInMs(60_000)
.enableCompression(false)
.enableThriftCompression(false)
.zoneId(null)
.enableRedirection(SessionConfig.DEFAULT_REDIRECTION_MODE)
.connectionTimeoutInMs(SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS)
Expand All @@ -267,7 +267,7 @@ public ITableSessionPool getTableSessionPool(int maxSize, String database) {
.maxSize(maxSize)
.fetchSize(SessionConfig.DEFAULT_FETCH_SIZE)
.waitToGetSessionTimeoutInMs(60_000)
.enableCompression(false)
.enableThriftCompression(false)
.zoneId(null)
.enableRedirection(SessionConfig.DEFAULT_REDIRECTION_MODE)
.connectionTimeoutInMs(SessionConfig.DEFAULT_CONNECTION_TIMEOUT_MS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.session.it;

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.isession.ITableSession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.TableSessionBuilder;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class IoTDBSessionCompressedIT {

private static ITableSession session1;
private static ITableSession session2;
private static ITableSession session3;
private static ITableSession session4;

@BeforeClass
public static void setUpClass() throws IoTDBConnectionException {
EnvFactory.getEnv().initClusterEnvironment();

List<String> nodeUrls =
EnvFactory.getEnv().getDataNodeWrapperList().stream()
.map(DataNodeWrapper::getIpAndPortString)
.collect(Collectors.toList());
// List<String> nodeUrls = Collections.singletonList("127.0.0.1:6667");
session1 =
new TableSessionBuilder()
.nodeUrls(nodeUrls)
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
.enableCompression(true)
.enableRedirection(true)
.enableAutoFetch(false)
.withCompressionType(CompressionType.SNAPPY)
.withBooleanEncoding(TSEncoding.PLAIN)
.withInt32Encoding(TSEncoding.CHIMP)
.withInt64Encoding(TSEncoding.CHIMP)
.withFloatEncoding(TSEncoding.CHIMP)
.withDoubleEncoding(TSEncoding.CHIMP)
.withBlobEncoding(TSEncoding.PLAIN)
.withStringEncoding(TSEncoding.PLAIN)
.withTextEncoding(TSEncoding.PLAIN)
.withDateEncoding(TSEncoding.PLAIN)
.withTimeStampEncoding(TSEncoding.PLAIN)
.build();
session2 =
new TableSessionBuilder()
.nodeUrls(nodeUrls)
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
.enableCompression(true)
.enableRedirection(true)
.enableAutoFetch(false)
.withCompressionType(CompressionType.SNAPPY)
.withBooleanEncoding(TSEncoding.PLAIN)
.withInt32Encoding(TSEncoding.SPRINTZ)
.withInt64Encoding(TSEncoding.SPRINTZ)
.withFloatEncoding(TSEncoding.RLBE)
.withDoubleEncoding(TSEncoding.RLBE)
.withBlobEncoding(TSEncoding.PLAIN)
.withStringEncoding(TSEncoding.PLAIN)
.withTextEncoding(TSEncoding.PLAIN)
.withDateEncoding(TSEncoding.PLAIN)
.withTimeStampEncoding(TSEncoding.SPRINTZ)
.build();
session3 =
new TableSessionBuilder()
.nodeUrls(nodeUrls)
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
.enableCompression(true)
.enableRedirection(true)
.enableAutoFetch(false)
.withCompressionType(CompressionType.GZIP)
.withBooleanEncoding(TSEncoding.RLE)
.withInt32Encoding(TSEncoding.TS_2DIFF)
.withInt64Encoding(TSEncoding.RLE)
.withFloatEncoding(TSEncoding.TS_2DIFF)
.withDoubleEncoding(TSEncoding.RLE)
.withBlobEncoding(TSEncoding.PLAIN)
.withStringEncoding(TSEncoding.PLAIN)
.withTextEncoding(TSEncoding.PLAIN)
.withDateEncoding(TSEncoding.RLE)
.withTimeStampEncoding(TSEncoding.RLE)
.build();
session4 =
new TableSessionBuilder()
.nodeUrls(nodeUrls)
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
.enableCompression(true)
.enableRedirection(true)
.enableAutoFetch(false)
.withCompressionType(CompressionType.LZMA2)
.withBooleanEncoding(TSEncoding.PLAIN)
.withInt32Encoding(TSEncoding.GORILLA)
.withInt64Encoding(TSEncoding.ZIGZAG)
.withFloatEncoding(TSEncoding.GORILLA)
.withDoubleEncoding(TSEncoding.GORILLA)
.withBlobEncoding(TSEncoding.PLAIN)
.withStringEncoding(TSEncoding.PLAIN)
.withTextEncoding(TSEncoding.PLAIN)
.withDateEncoding(TSEncoding.RLE)
.withTimeStampEncoding(TSEncoding.ZIGZAG)
.build();
}

@AfterClass
public static void tearDownClass() throws IoTDBConnectionException {
if (session1 != null) {
session1.close();
}
if (session2 != null) {
session2.close();
}
if (session3 != null) {
session3.close();
}
if (session4 != null) {
session4.close();
}
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void testRpcCompressed() throws IoTDBConnectionException, StatementExecutionException {
List<IMeasurementSchema> schemas = new ArrayList<>();
MeasurementSchema schema = new MeasurementSchema();
schema.setMeasurementName("pressure0");
schema.setDataType(TSDataType.INT32);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);
schema = new MeasurementSchema();
schema.setMeasurementName("pressure1");
schema.setDataType(TSDataType.INT64);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);
schema = new MeasurementSchema();
schema.setMeasurementName("pressure2");
schema.setDataType(TSDataType.FLOAT);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);
schema = new MeasurementSchema();
schema.setMeasurementName("pressure3");
schema.setDataType(TSDataType.DOUBLE);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);
schema = new MeasurementSchema();
schema.setMeasurementName("pressure4");
schema.setDataType(TSDataType.TEXT);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);
schema = new MeasurementSchema();
schema.setMeasurementName("pressure5");
schema.setDataType(TSDataType.BOOLEAN);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);
schema = new MeasurementSchema();
schema.setMeasurementName("pressure6");
schema.setDataType(TSDataType.STRING);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);
schema = new MeasurementSchema();
schema.setMeasurementName("pressure7");
schema.setDataType(TSDataType.BLOB);
schema.setCompressionType(CompressionType.SNAPPY);
schema.setEncoding(TSEncoding.PLAIN);
schemas.add(schema);

long[] timestamp = new long[] {3L, 4L, 5L, 6L};
Object[] values = new Object[8];
values[0] = new int[] {1, 2, 8, 15};
values[1] = new long[] {1L, 2L, 8L, 15L};
values[2] = new float[] {1.1f, 1.2f, 8.8f, 15.5f};
values[3] = new double[] {0.707, 0.708, 8.8, 15.5};
values[4] =
new Binary[] {
new Binary(new byte[] {(byte) 32}),
new Binary(new byte[] {(byte) 16}),
new Binary(new byte[] {(byte) 1}),
new Binary(new byte[] {(byte) 56})
};
values[5] = new boolean[] {true, false, true, false};
values[6] =
new Binary[] {
new Binary(new byte[] {(byte) 32}),
new Binary(new byte[] {(byte) 16}),
new Binary(new byte[] {(byte) 1}),
new Binary(new byte[] {(byte) 56})
};
values[7] =
new Binary[] {
new Binary(new byte[] {(byte) 32}),
new Binary(new byte[] {(byte) 16}),
new Binary(new byte[] {(byte) 1}),
new Binary(new byte[] {(byte) 56})
};
BitMap[] partBitMap = new BitMap[8];

String tableName = "table_13";
Tablet tablet = new Tablet(tableName, schemas, timestamp, values, partBitMap, 4);

session1.executeNonQueryStatement("create database IF NOT EXISTS dbTest_0");
session1.executeNonQueryStatement("use dbTest_0");
session2.executeNonQueryStatement("use dbTest_0");
session3.executeNonQueryStatement("use dbTest_0");
session4.executeNonQueryStatement("use dbTest_0");

// 1. insert
session1.insert(tablet);
session2.insert(tablet);
session3.insert(tablet);
session4.insert(tablet);

// 2. assert
SessionDataSet sessionDataSet1 =
session1.executeQueryStatement("select * from dbTest_0." + tableName);
SessionDataSet sessionDataSet2 =
session2.executeQueryStatement("select * from dbTest_0." + tableName);
SessionDataSet sessionDataSet3 =
session3.executeQueryStatement("select * from dbTest_0." + tableName);
SessionDataSet sessionDataSet4 =
session4.executeQueryStatement("select * from dbTest_0." + tableName);

if (sessionDataSet1.hasNext()) {
RowRecord next = sessionDataSet1.next();
Assert.assertEquals(3L, next.getFields().get(0).getLongV());
Assert.assertEquals(1, next.getFields().get(1).getIntV());
Assert.assertEquals(1L, next.getFields().get(2).getLongV());
Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(5).getBinaryV());
Assert.assertEquals(true, next.getFields().get(6).getBoolV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(7).getBinaryV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(8).getBinaryV());
}
if (sessionDataSet2.hasNext()) {
RowRecord next = sessionDataSet2.next();
Assert.assertEquals(3L, next.getFields().get(0).getLongV());
Assert.assertEquals(1, next.getFields().get(1).getIntV());
Assert.assertEquals(1L, next.getFields().get(2).getLongV());
Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(5).getBinaryV());
Assert.assertEquals(true, next.getFields().get(6).getBoolV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(7).getBinaryV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(8).getBinaryV());
}
if (sessionDataSet3.hasNext()) {
RowRecord next = sessionDataSet3.next();
Assert.assertEquals(3L, next.getFields().get(0).getLongV());
Assert.assertEquals(1, next.getFields().get(1).getIntV());
Assert.assertEquals(1L, next.getFields().get(2).getLongV());
Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(5).getBinaryV());
Assert.assertEquals(true, next.getFields().get(6).getBoolV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(7).getBinaryV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(8).getBinaryV());
}
if (sessionDataSet4.hasNext()) {
RowRecord next = sessionDataSet4.next();
Assert.assertEquals(3L, next.getFields().get(0).getLongV());
Assert.assertEquals(1, next.getFields().get(1).getIntV());
Assert.assertEquals(1L, next.getFields().get(2).getLongV());
Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(5).getBinaryV());
Assert.assertEquals(true, next.getFields().get(6).getBoolV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(7).getBinaryV());
Assert.assertEquals(new Binary(new byte[] {(byte) 32}), next.getFields().get(8).getBinaryV());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void init() throws InterruptedException {
.user(username)
.password(password)
.maxSize(threadNum + 1)
.enableCompression(false)
.enableThriftCompression(false)
.enableRedirection(false)
.enableAutoFetch(false)
.database(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void init() throws InterruptedException {
.user(username)
.password(password)
.maxSize(threadNum + 1)
.enableCompression(false)
.enableThriftCompression(false)
.enableRedirection(false)
.enableAutoFetch(false)
.database(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void init() throws InterruptedException {
.user(username)
.password(password)
.maxSize(threadNum + 1)
.enableCompression(false)
.enableThriftCompression(false)
.enableRedirection(false)
.enableAutoFetch(false)
.database(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ SessionDataSetWrapper executeAggregationQuery(

long getWaitToGetSessionTimeoutInMs();

boolean isEnableCompression();
boolean isEnableThriftCompression();

void setEnableRedirection(boolean enableRedirection);

Expand Down
Loading