Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions example/session/src/main/java/org/apache/iotdb/SessionExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,38 +90,38 @@ public static void main(String[] args)
// createTemplate();
createTimeseries();
createMultiTimeseries();
insertRecord();
insertTablet();
// insertRecord();
// insertTablet();
// insertTabletWithNullValues();
// insertTablets();
// insertRecords();
insertRecords();
// insertText();
// selectInto();
// createAndDropContinuousQueries();
// nonQuery();
query();
// query();
// queryWithTimeout();
rawDataQuery();
lastDataQuery();
aggregationQuery();
groupByQuery();
// rawDataQuery();
// lastDataQuery();
// aggregationQuery();
// groupByQuery();
// queryByIterator();
// deleteData();
// deleteTimeseries();
// setTimeout();

sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
sessionEnableRedirect.setEnableQueryRedirection(true);
sessionEnableRedirect.open(false);

// set session fetchSize
sessionEnableRedirect.setFetchSize(10000);

fastLastDataQueryForOneDevice();
insertRecord4Redirect();
query4Redirect();
sessionEnableRedirect.close();
session.close();
// sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
// sessionEnableRedirect.setEnableQueryRedirection(true);
// sessionEnableRedirect.open(false);
//
// // set session fetchSize
// sessionEnableRedirect.setFetchSize(10000);
//
// fastLastDataQueryForOneDevice();
// insertRecord4Redirect();
// query4Redirect();
// sessionEnableRedirect.close();
// session.close();
}

private static void createAndDropContinuousQueries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class RpcTransportFactory extends TTransportFactory {

private final TTransportFactory inner;

private RpcTransportFactory(TTransportFactory inner) {
public RpcTransportFactory(TTransportFactory inner) {
this.inner = inner;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@
package org.apache.iotdb.rpc;

import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TSnappyElasticFramedTransport.class);
private static final AtomicLong totalUncompressionTime = new AtomicLong(0);
private static final AtomicLong totalUncompressionCount = new AtomicLong(0);
private static final AtomicLong totalOriginalDataSize = new AtomicLong(0);
private static final AtomicLong totalCompressedDataSize = new AtomicLong(0);

public static class Factory extends TElasticFramedTransport.Factory {

Expand Down Expand Up @@ -74,6 +82,23 @@ protected int compress(byte[] input, int inOff, int len, byte[] output, int outO
@Override
protected void uncompress(byte[] input, int inOff, int size, byte[] output, int outOff)
throws IOException {
Snappy.uncompress(input, inOff, size, output, outOff);

long startTime = System.nanoTime();
int uncompressedSize = Snappy.uncompress(input, inOff, size, output, outOff);
long endTime = System.nanoTime();
totalUncompressionTime.addAndGet(endTime - startTime);
totalOriginalDataSize.addAndGet(uncompressedSize);
totalCompressedDataSize.addAndGet(size);
long count = totalUncompressionCount.incrementAndGet();
if (count % 1000 == 0) {
LOGGER.info(
"Average uncompression time: {} ms, average compression rate: {}",
totalUncompressionTime.doubleValue() / count / 1000000,
totalOriginalDataSize.doubleValue() / totalCompressedDataSize.doubleValue());
totalUncompressionCount.set(0);
totalUncompressionTime.set(0);
totalOriginalDataSize.set(0);
totalCompressedDataSize.set(0);
}
}
}
5 changes: 5 additions & 0 deletions iotdb-client/session/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
<dependency>
<groupId>org.mine.rpcutils</groupId>
<artifactId>RPC-Utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
Expand Down Expand Up @@ -72,6 +73,8 @@
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.apache.thrift.TException;
import org.mine.rpc.InsertRecordsReq;
import org.mine.rpc.InsertRecordsSerializeInColumnUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -123,6 +126,8 @@ public class Session implements ISession {
protected boolean useSSL;
protected String trustStore;
protected String trustStorePwd;
public static boolean useNewFormat = true;

/**
* Timeout of query can be set by users. A negative number means using the default configuration
* of server. And value 0 will disable the function of query timeout.
Expand Down Expand Up @@ -1904,7 +1909,7 @@ public void insertRecords(
if (enableRedirection) {
insertRecordsWithLeaderCache(
deviceIds, times, measurementsList, typesList, valuesList, false);
} else {
} else if (!useNewFormat) {
TSInsertRecordsReq request;
try {
request =
Expand All @@ -1918,6 +1923,24 @@ public void insertRecords(
defaultSessionConnection.insertRecords(request);
} catch (RedirectException ignored) {
}
} else {
// insert using new column rpc format
InsertRecordsReq req =
new InsertRecordsReq(deviceIds, measurementsList, typesList, valuesList, times);
ByteBuffer buffer = null;
try {
buffer = InsertRecordsSerializeInColumnUtils.encode(req);
} catch (IOException e) {
logger.error("Meets Exception when serializing buffer", e);
return;
}
TSInsertRecordsReqV2ColumnFormat request = new TSInsertRecordsReqV2ColumnFormat();
request.setBuffer(buffer);
try {
defaultSessionConnection.insertRecords(request);
} catch (Exception e) {
logger.error("Meets exception when insert new records", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
Expand Down Expand Up @@ -162,6 +163,13 @@ private void init(TEndPoint endPoint, boolean useSSL, String trustStore, String
session.connectionTimeoutInMs,
trustStore,
trustStorePwd);
} else if (session.enableRPCCompression) {
RpcTransportFactory.setUseSnappy(true);
RpcTransportFactory.reInit();
transport =
RpcTransportFactory.INSTANCE.getTransport(
// as there is a try-catch already, we do not need to use TSocket.wrap
endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs);
} else {
transport =
RpcTransportFactory.INSTANCE.getTransport(
Expand Down Expand Up @@ -870,6 +878,15 @@ private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws TExcep
return client.insertRecords(request);
}

protected void insertRecords(TSInsertRecordsReqV2ColumnFormat request) throws TException {
request.setSessionId(sessionId);
try {
client.insertRecordsV2ColumnFormat(request);
} catch (TException e) {
throw e;
}
}

protected void insertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException, RedirectException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ dn_seed_config_node=127.0.0.1:10710
# this feature is under development, set this as false before it is done.
# dn_rpc_advanced_compression_enable=false

# dn_data_transfer_compression_enable=false

# Datatype: int
# dn_rpc_selector_thread_count=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class IoTDBConfig {
/** whether to use Snappy compression before sending data through the network */
private boolean rpcAdvancedCompressionEnable = false;

private boolean dataTransportCompressionEnable = false;

/** Port which the JDBC server listens to. */
private int rpcPort = 6667;

Expand Down Expand Up @@ -1058,6 +1060,8 @@ public class IoTDBConfig {
/** whether the local write api records audit logs * */
private boolean enableAuditLogForNativeInsertApi = true;

private boolean enableWALCompression = true;

// customizedProperties, this should be empty by default.
private Properties customizedProperties = new Properties();

Expand Down Expand Up @@ -2589,6 +2593,14 @@ public void setRpcAdvancedCompressionEnable(boolean rpcAdvancedCompressionEnable
RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable);
}

public boolean isDataTransportCompressionEnable() {
return dataTransportCompressionEnable;
}

public void setDataTransportCompressionEnable(boolean dataTransportCompressionEnable) {
this.dataTransportCompressionEnable = dataTransportCompressionEnable;
}

public int getMlogBufferSize() {
return mlogBufferSize;
}
Expand Down Expand Up @@ -3801,4 +3813,12 @@ public void setInnerCompactionTaskSelectionDiskRedundancy(
double innerCompactionTaskSelectionDiskRedundancy) {
this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy;
}

public boolean isEnableWALCompression() {
return enableWALCompression;
}

public void setEnableWALCompression(boolean enableWALCompression) {
this.enableWALCompression = enableWALCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
Boolean.toString(conf.isRpcAdvancedCompressionEnable()))
.trim()));

conf.setDataTransportCompressionEnable(
Boolean.parseBoolean(
properties
.getProperty(
"dn_data_transfer_compression_enable",
Boolean.toString(conf.isDataTransportCompressionEnable()))
.trim()));

conf.setConnectionTimeoutInMS(
Integer.parseInt(
properties
Expand Down Expand Up @@ -434,6 +442,11 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
"enable_seq_space_compaction",
Boolean.toString(conf.isEnableSeqSpaceCompaction()))));

conf.setEnableWALCompression(
Boolean.parseBoolean(
properties.getProperty(
"enable_wal_compression", Boolean.toString(conf.isEnableWALCompression()))));

conf.setEnableUnseqSpaceCompaction(
Boolean.parseBoolean(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
Expand Down Expand Up @@ -181,6 +182,8 @@
import io.jsonwebtoken.lang.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.mine.rpc.InsertRecordsReq;
import org.mine.rpc.InsertRecordsSerializeInColumnUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1643,6 +1646,7 @@ public TSStatus insertRecords(TSInsertRecordsReq req) {

// Step 1: transfer from TSInsertRecordsReq to Statement
InsertRowsStatement statement = StatementGenerator.createStatement(req);
req = null;
// return success when this statement is empty because server doesn't need to execute it
if (statement.isEmpty()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
Expand Down Expand Up @@ -1694,6 +1698,33 @@ public TSStatus insertRecords(TSInsertRecordsReq req) {
}
}

@Override
public TSStatus insertRecordsV2ColumnFormat(TSInsertRecordsReqV2ColumnFormat req) {
byte[] buffer = req.getBuffer();
if (buffer == null) {
return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, "Buffer is null");
}
try {
InsertRecordsReq originalReq =
InsertRecordsSerializeInColumnUtils.decode(ByteBuffer.wrap(buffer));
InsertRowsStatement statement = StatementGenerator.createStatement(originalReq);
// originalReq = null;
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
COORDINATOR.execute(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
"",
partitionFetcher,
schemaFetcher);
return result.status;
} catch (Exception e) {
LOGGER.error("Meet error while inserting will new request", e);
return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, e.getMessage());
}
}

@Override
public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) {
long t1 = System.nanoTime();
Expand Down
Loading