diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d5f40452ea8c0..0b0c2ff8cafc3 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -90,38 +90,38 @@ public static void main(String[] args) // createTemplate(); createTimeseries(); createMultiTimeseries(); - insertRecord(); - insertTablet(); + // insertRecord(); + // insertTablet(); // insertTabletWithNullValues(); // insertTablets(); - // insertRecords(); + insertRecords(); // insertText(); // selectInto(); // createAndDropContinuousQueries(); // nonQuery(); - query(); + // query(); // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); + // rawDataQuery(); + // lastDataQuery(); + // aggregationQuery(); + // groupByQuery(); // queryByIterator(); // deleteData(); // deleteTimeseries(); // setTimeout(); - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); - session.close(); + // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); + // sessionEnableRedirect.setEnableQueryRedirection(true); + // sessionEnableRedirect.open(false); + // + // // set session fetchSize + // sessionEnableRedirect.setFetchSize(10000); + // + // fastLastDataQueryForOneDevice(); + // insertRecord4Redirect(); + // query4Redirect(); + // sessionEnableRedirect.close(); + // session.close(); } private static void createAndDropContinuousQueries() diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java index 191f0a42f9531..c1d799f2aadd2 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java @@ -42,7 +42,7 @@ public class RpcTransportFactory extends TTransportFactory { private final TTransportFactory inner; - private RpcTransportFactory(TTransportFactory inner) { + public RpcTransportFactory(TTransportFactory inner) { this.inner = inner; } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java index a85ccf0c5b53d..817c994061a62 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java @@ -20,11 +20,19 @@ package org.apache.iotdb.rpc; import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTransport { + private static final Logger LOGGER = LoggerFactory.getLogger(TSnappyElasticFramedTransport.class); + private static final AtomicLong totalUncompressionTime = new AtomicLong(0); + private static final AtomicLong totalUncompressionCount = new AtomicLong(0); + private static final AtomicLong totalOriginalDataSize = new AtomicLong(0); + private static final AtomicLong totalCompressedDataSize = new AtomicLong(0); public static class Factory extends TElasticFramedTransport.Factory { @@ -74,6 +82,23 @@ protected int compress(byte[] input, int inOff, int len, byte[] output, int outO @Override protected void uncompress(byte[] input, int inOff, int size, byte[] output, int outOff) throws IOException { - Snappy.uncompress(input, inOff, size, output, outOff); + + long startTime = System.nanoTime(); + int uncompressedSize = Snappy.uncompress(input, inOff, size, output, outOff); + long endTime = System.nanoTime(); + totalUncompressionTime.addAndGet(endTime - startTime); + totalOriginalDataSize.addAndGet(uncompressedSize); + totalCompressedDataSize.addAndGet(size); + long count = totalUncompressionCount.incrementAndGet(); + if (count % 1000 == 0) { + LOGGER.info( + "Average uncompression time: {} ms, average compression rate: {}", + totalUncompressionTime.doubleValue() / count / 1000000, + totalOriginalDataSize.doubleValue() / totalCompressedDataSize.doubleValue()); + totalUncompressionCount.set(0); + totalUncompressionTime.set(0); + totalOriginalDataSize.set(0); + totalCompressedDataSize.set(0); + } } } diff --git a/iotdb-client/session/pom.xml b/iotdb-client/session/pom.xml index dcaa351591439..1c77ca417a7be 100644 --- a/iotdb-client/session/pom.xml +++ b/iotdb-client/session/pom.xml @@ -72,6 +72,11 @@ org.apache.thrift libthrift + + org.mine.rpcutils + RPC-Utils + 1.0-SNAPSHOT + junit junit diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 4d0ad33efd71c..66c137fc8f8ca 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -45,6 +45,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -72,6 +73,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.thrift.TException; +import org.mine.rpc.InsertRecordsReq; +import org.mine.rpc.InsertRecordsSerializeInColumnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +126,8 @@ public class Session implements ISession { protected boolean useSSL; protected String trustStore; protected String trustStorePwd; + public static boolean useNewFormat = true; + /** * Timeout of query can be set by users. A negative number means using the default configuration * of server. And value 0 will disable the function of query timeout. @@ -1904,7 +1909,7 @@ public void insertRecords( if (enableRedirection) { insertRecordsWithLeaderCache( deviceIds, times, measurementsList, typesList, valuesList, false); - } else { + } else if (!useNewFormat) { TSInsertRecordsReq request; try { request = @@ -1918,6 +1923,24 @@ public void insertRecords( defaultSessionConnection.insertRecords(request); } catch (RedirectException ignored) { } + } else { + // insert using new column rpc format + InsertRecordsReq req = + new InsertRecordsReq(deviceIds, measurementsList, typesList, valuesList, times); + ByteBuffer buffer = null; + try { + buffer = InsertRecordsSerializeInColumnUtils.encode(req); + } catch (IOException e) { + logger.error("Meets Exception when serializing buffer", e); + return; + } + TSInsertRecordsReqV2ColumnFormat request = new TSInsertRecordsReqV2ColumnFormat(); + request.setBuffer(buffer); + try { + defaultSessionConnection.insertRecords(request); + } catch (Exception e) { + logger.error("Meets exception when insert new records", e); + } } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 6cb618e8fa935..57286acd01c10 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -48,6 +48,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -162,6 +163,13 @@ private void init(TEndPoint endPoint, boolean useSSL, String trustStore, String session.connectionTimeoutInMs, trustStore, trustStorePwd); + } else if (session.enableRPCCompression) { + RpcTransportFactory.setUseSnappy(true); + RpcTransportFactory.reInit(); + transport = + RpcTransportFactory.INSTANCE.getTransport( + // as there is a try-catch already, we do not need to use TSocket.wrap + endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs); } else { transport = RpcTransportFactory.INSTANCE.getTransport( @@ -870,6 +878,15 @@ private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws TExcep return client.insertRecords(request); } + protected void insertRecords(TSInsertRecordsReqV2ColumnFormat request) throws TException { + request.setSessionId(sessionId); + try { + client.insertRecordsV2ColumnFormat(request); + } catch (TException e) { + throw e; + } + } + protected void insertRecords(TSInsertStringRecordsReq request) throws IoTDBConnectionException, StatementExecutionException, RedirectException { diff --git a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties index e6ee742796e44..60206138c5ab9 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties +++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties @@ -107,6 +107,8 @@ dn_seed_config_node=127.0.0.1:10710 # this feature is under development, set this as false before it is done. # dn_rpc_advanced_compression_enable=false +# dn_data_transfer_compression_enable=false + # Datatype: int # dn_rpc_selector_thread_count=1 diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 854401d7b5e35..ae9942f1607cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -120,6 +120,8 @@ public class IoTDBConfig { /** whether to use Snappy compression before sending data through the network */ private boolean rpcAdvancedCompressionEnable = false; + private boolean dataTransportCompressionEnable = false; + /** Port which the JDBC server listens to. */ private int rpcPort = 6667; @@ -1058,6 +1060,8 @@ public class IoTDBConfig { /** whether the local write api records audit logs * */ private boolean enableAuditLogForNativeInsertApi = true; + private boolean enableWALCompression = true; + // customizedProperties, this should be empty by default. private Properties customizedProperties = new Properties(); @@ -2589,6 +2593,14 @@ public void setRpcAdvancedCompressionEnable(boolean rpcAdvancedCompressionEnable RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable); } + public boolean isDataTransportCompressionEnable() { + return dataTransportCompressionEnable; + } + + public void setDataTransportCompressionEnable(boolean dataTransportCompressionEnable) { + this.dataTransportCompressionEnable = dataTransportCompressionEnable; + } + public int getMlogBufferSize() { return mlogBufferSize; } @@ -3801,4 +3813,12 @@ public void setInnerCompactionTaskSelectionDiskRedundancy( double innerCompactionTaskSelectionDiskRedundancy) { this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy; } + + public boolean isEnableWALCompression() { + return enableWALCompression; + } + + public void setEnableWALCompression(boolean enableWALCompression) { + this.enableWALCompression = enableWALCompression; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e4c5c243598aa..b6880715dfec5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -234,6 +234,14 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO Boolean.toString(conf.isRpcAdvancedCompressionEnable())) .trim())); + conf.setDataTransportCompressionEnable( + Boolean.parseBoolean( + properties + .getProperty( + "dn_data_transfer_compression_enable", + Boolean.toString(conf.isDataTransportCompressionEnable())) + .trim())); + conf.setConnectionTimeoutInMS( Integer.parseInt( properties @@ -434,6 +442,11 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO "enable_seq_space_compaction", Boolean.toString(conf.isEnableSeqSpaceCompaction())))); + conf.setEnableWALCompression( + Boolean.parseBoolean( + properties.getProperty( + "enable_wal_compression", Boolean.toString(conf.isEnableWALCompression())))); + conf.setEnableUnseqSpaceCompaction( Boolean.parseBoolean( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 27bda2168f809..1129bc436fb54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -142,6 +142,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReqV2ColumnFormat; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq; import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; @@ -181,6 +182,8 @@ import io.jsonwebtoken.lang.Strings; import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; +import org.mine.rpc.InsertRecordsReq; +import org.mine.rpc.InsertRecordsSerializeInColumnUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1643,6 +1646,7 @@ public TSStatus insertRecords(TSInsertRecordsReq req) { // Step 1: transfer from TSInsertRecordsReq to Statement InsertRowsStatement statement = StatementGenerator.createStatement(req); + req = null; // return success when this statement is empty because server doesn't need to execute it if (statement.isEmpty()) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); @@ -1694,6 +1698,33 @@ public TSStatus insertRecords(TSInsertRecordsReq req) { } } + @Override + public TSStatus insertRecordsV2ColumnFormat(TSInsertRecordsReqV2ColumnFormat req) { + byte[] buffer = req.getBuffer(); + if (buffer == null) { + return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, "Buffer is null"); + } + try { + InsertRecordsReq originalReq = + InsertRecordsSerializeInColumnUtils.decode(ByteBuffer.wrap(buffer)); + InsertRowsStatement statement = StatementGenerator.createStatement(originalReq); + // originalReq = null; + long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.execute( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), + "", + partitionFetcher, + schemaFetcher); + return result.status; + } catch (Exception e) { + LOGGER.error("Meet error while inserting will new request", e); + return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, e.getMessage()); + } + } + @Override public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) { long t1 = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java index 3a7ac2c0cbea1..830228246d40d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java @@ -100,6 +100,7 @@ import org.antlr.v4.runtime.CommonTokenStream; import org.antlr.v4.runtime.atn.PredictionMode; import org.antlr.v4.runtime.tree.ParseTree; +import org.mine.rpc.InsertRecordsReq; import java.nio.ByteBuffer; import java.time.ZoneId; @@ -408,6 +409,28 @@ public static InsertRowsStatement createStatement(TSInsertRecordsReq req) return insertStatement; } + public static InsertRowsStatement createStatement(InsertRecordsReq req) + throws IllegalPathException { + final long startTime = System.nanoTime(); + // construct insert statement + InsertRowsStatement insertStatement = new InsertRowsStatement(); + List insertRowStatementList = new ArrayList<>(); + List devicePath = req.getPrefixPath(); + for (int i = 0, size = devicePath.size(); i < size; ++i) { + InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath(DEVICE_PATH_CACHE.getPartialPath(devicePath.get(i))); + statement.setMeasurements(req.getMeasurements().get(i).toArray(new String[0])); + TimestampPrecisionUtils.checkTimestampPrecision(req.getTimestamp().get(i)); + statement.setTime(req.getTimestamp().get(i)); + statement.setValues(req.getValues().get(i).toArray(new Object[0])); + statement.setDataTypes(req.getTypes().get(i).toArray(new TSDataType[0])); + insertRowStatementList.add(statement); + } + insertStatement.setInsertRowStatementList(insertRowStatementList); + PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime); + return insertStatement; + } + public static InsertRowsStatement createStatement(TSInsertStringRecordsReq req) throws IllegalPathException { final long startTime = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java index af747cb1dd09c..0efc54c6eb57d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -85,7 +85,7 @@ public void initThriftServiceThread() throws IllegalAccessException { config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), new RPCServiceThriftHandler(impl), - IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()); + config.isDataTransportCompressionEnable()); } } catch (RPCServiceException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java index 5d2bad0a87407..578ab21ae8c7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/CheckpointReader.java @@ -24,10 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -47,8 +45,7 @@ public CheckpointReader(File logFile) { private void init() { checkpoints = new ArrayList<>(); - try (DataInputStream logStream = - new DataInputStream(new BufferedInputStream(new FileInputStream(logFile)))) { + try (DataInputStream logStream = new DataInputStream(new WALInputStream(logFile))) { maxMemTableId = logStream.readLong(); while (logStream.available() > 0) { Checkpoint checkpoint = Checkpoint.deserialize(logStream); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java index 68f4deae31894..05e30f3a54728 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/LogWriter.java @@ -19,8 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.wal.io; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.Checkpoint; +import org.apache.iotdb.tsfile.compress.ICompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,18 +47,51 @@ public abstract class LogWriter implements ILogWriter { protected final FileOutputStream logStream; protected final FileChannel logChannel; protected long size; + protected boolean isEndFile = false; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1); + private final ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4); + private final ByteBuffer compressedByteBuffer; protected LogWriter(File logFile) throws FileNotFoundException { this.logFile = logFile; this.logStream = new FileOutputStream(logFile, true); this.logChannel = this.logStream.getChannel(); + if (IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression()) { + compressedByteBuffer = + ByteBuffer.allocate( + compressor.getMaxBytesForCompression( + IoTDBDescriptor.getInstance().getConfig().getWalBufferSize())); + } else { + compressedByteBuffer = null; + } } @Override public void write(ByteBuffer buffer) throws IOException { - size += buffer.position(); + int bufferSize = buffer.position(); buffer.flip(); + boolean compressed = false; + int uncompressedSize = bufferSize; + if (!isEndFile && IoTDBDescriptor.getInstance().getConfig().isEnableWALCompression() + /* && bufferSize > 1024 * 512 Do not compress buffer that is less than 512KB */) { + compressedByteBuffer.clear(); + compressor.compress(buffer, compressedByteBuffer); + buffer = compressedByteBuffer; + bufferSize = buffer.position(); + buffer.flip(); + compressed = true; + } + size += bufferSize; + headerBuffer.clear(); + headerBuffer.putInt(bufferSize); + headerBuffer.put((byte) (compressed ? 1 : 0)); try { + logger.error("Channel's offset is {}", logChannel.position()); + if (compressed) { + headerBuffer.putInt(uncompressedSize); + } + headerBuffer.flip(); + logChannel.write(headerBuffer); logChannel.write(buffer); } catch (ClosedChannelException e) { logger.warn("Cannot write to {}", logFile, e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java index f101eaf3647e5..2c010102fb003 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALByteBufReader.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; import java.io.Closeable; +import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -36,6 +37,7 @@ public class WALByteBufReader implements Closeable { private final File logFile; private final FileChannel channel; + private final DataInputStream logStream; private final WALMetaData metaData; private final Iterator sizeIterator; @@ -46,6 +48,7 @@ public WALByteBufReader(File logFile) throws IOException { public WALByteBufReader(File logFile, FileChannel channel) throws IOException { this.logFile = logFile; this.channel = channel; + this.logStream = new DataInputStream(new WALInputStream(logFile)); this.metaData = WALMetaData.readFromWALFile(logFile, channel); this.sizeIterator = metaData.getBuffersSize().iterator(); channel.position(0); @@ -62,10 +65,10 @@ public boolean hasNext() { * @throws IOException when failing to read from channel. */ public ByteBuffer next() throws IOException { - int size = sizeIterator.next(); + int size = sizeIterator.next(); // size of wal entry before compression ByteBuffer buffer = ByteBuffer.allocate(size); - channel.read(buffer); - buffer.clear(); + logStream.readFully(buffer.array(), 0, size); + buffer.flip(); return buffer; } @@ -82,3 +85,89 @@ public long getFirstSearchIndex() { return metaData.getFirstSearchIndex(); } } + + +///* +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// */ +// +//package org.apache.iotdb.db.storageengine.dataregion.wal.io; +// +//import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry; +// +//import java.io.Closeable; +//import java.io.File; +//import java.io.IOException; +//import java.nio.ByteBuffer; +//import java.nio.channels.FileChannel; +//import java.nio.file.StandardOpenOption; +//import java.util.Iterator; +// +///** +// * This reader returns {@link WALEntry} as {@link ByteBuffer}, the usage of WALByteBufReader is like +// * {@link Iterator}. +// */ +//public class WALByteBufReader implements Closeable { +// private final File logFile; +// private final FileChannel channel; +// private final WALMetaData metaData; +// private final Iterator sizeIterator; +// +// public WALByteBufReader(File logFile) throws IOException { +// this(logFile, FileChannel.open(logFile.toPath(), StandardOpenOption.READ)); +// } +// +// public WALByteBufReader(File logFile, FileChannel channel) throws IOException { +// this.logFile = logFile; +// this.channel = channel; +// this.metaData = WALMetaData.readFromWALFile(logFile, channel); +// this.sizeIterator = metaData.getBuffersSize().iterator(); +// channel.position(0); +// } +// +// /** Like {@link Iterator#hasNext()}. */ +// public boolean hasNext() { +// return sizeIterator.hasNext(); +// } +// +// /** +// * Like {@link Iterator#next()}. +// * +// * @throws IOException when failing to read from channel. +// */ +// public ByteBuffer next() throws IOException { +// int size = sizeIterator.next(); +// ByteBuffer buffer = ByteBuffer.allocate(size); +// channel.read(buffer); +// buffer.clear(); +// return buffer; +// } +// +// public WALMetaData getMetaData() { +// return metaData; +// } +// +// @Override +// public void close() throws IOException { +// channel.close(); +// } +// +// public long getFirstSearchIndex() { +// return metaData.getFirstSearchIndex(); +// } +//} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java new file mode 100644 index 0000000000000..d9f39b933d001 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALInputStream.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.storageengine.dataregion.wal.io; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Objects; + +public class WALInputStream extends InputStream implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(WALInputStream.class); + private final FileChannel channel; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 1); + private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES); + private ByteBuffer dataBuffer = + ByteBuffer.allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()); // uncompressed data buffer + + public WALInputStream(File logFile) throws IOException { + channel = FileChannel.open(logFile.toPath()); + } + + @Override + public int read() throws IOException { + if (Objects.isNull(dataBuffer) || dataBuffer.position() == dataBuffer.limit()) { + loadNextSegment(); + } + return dataBuffer.get() & 0xFF; + } + + @Override + public void close() throws IOException { + channel.close(); + dataBuffer = null; + } + + @Override + public int available() throws IOException { + return (int) (channel.size() - channel.position()); + } + + private void loadNextSegment() throws IOException { + headerBuffer.clear(); + if (channel.read(headerBuffer) != Integer.BYTES + 1) { + throw new IOException("Unexpected end of file"); + } + headerBuffer.flip(); + int dataSize = headerBuffer.getInt(); + boolean isCompressed = headerBuffer.get() == 1; + if (isCompressed) { + compressedHeader.clear(); + if (channel.read(compressedHeader) != Integer.BYTES) { + throw new IOException("Unexpected end of file"); + } + compressedHeader.flip(); + int uncompressedSize = compressedHeader.getInt(); + if (uncompressedSize > dataBuffer.capacity()) { + // enlarge buffer + dataBuffer = ByteBuffer.allocateDirect(uncompressedSize); + } + ByteBuffer compressedData = ByteBuffer.allocateDirect(dataSize); + if (channel.read(compressedData) != dataSize) { + throw new IOException("Unexpected end of file"); + } + compressedData.flip(); + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.LZ4); + dataBuffer.clear(); + unCompressor.uncompress(compressedData, dataBuffer); + } else { + dataBuffer = ByteBuffer.allocateDirect(dataSize); + if (channel.read(dataBuffer) != dataSize) { + throw new IOException("Unexpected end of file"); + } + } + dataBuffer.flip(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java index ee50c73df9722..475ea2b0b2d8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALReader.java @@ -26,12 +26,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.Iterator; import java.util.NoSuchElementException; @@ -57,9 +55,7 @@ public WALReader(File logFile) throws IOException { public WALReader(File logFile, boolean fileMayCorrupt) throws IOException { this.logFile = logFile; this.fileMayCorrupt = fileMayCorrupt; - this.logStream = - new DataInputStream( - new BufferedInputStream(Files.newInputStream(logFile.toPath()), STREAM_BUFFER_SIZE)); + this.logStream = new DataInputStream(new WALInputStream(logFile)); } /** Like {@link Iterator#hasNext()}. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java index 425fc676fad8c..20ae99754505c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/io/WALWriter.java @@ -59,6 +59,7 @@ public void updateMetaData(WALMetaData metaData) { } private void endFile() throws IOException { + this.isEndFile = true; WALSignalEntry endMarker = new WALSignalEntry(WALEntryType.WAL_FILE_INFO_END_MARKER); int metaDataSize = metaData.serializedSize(); ByteBuffer buffer = @@ -72,6 +73,7 @@ private void endFile() throws IOException { // add magic string buffer.put(MAGIC_STRING.getBytes()); write(buffer); + this.isEndFile = false; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java index 18c9cf79e6b12..7b6b7bae1b1a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALWriteUtils.java @@ -125,10 +125,12 @@ public static int write(String s, IWALByteBufferView buffer) { return write(NO_BYTE_TO_READ, buffer); } int len = 0; - byte[] bytes = s.getBytes(); - len += write(bytes.length, buffer); - buffer.put(bytes); - len += bytes.length; + len += write(s.length(), buffer); + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + buffer.put((byte) c); // ascii only + } + len += s.length(); return len; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index 12eea072f3224..794a7a2d72f46 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -23,6 +23,9 @@ import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.runtime.RPCServiceException; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport; import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TProcessor; @@ -208,6 +211,12 @@ protected AbstractThriftServiceThread( serverTransport = openTransport(bindAddress, port); TThreadPoolServer.Args poolArgs = initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); + if (compress) { + poolArgs.transportFactory( + new RpcTransportFactory( + new TimeoutChangeableTSnappyFramedTransport.Factory( + RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE))); + } poolServer = new TThreadPoolServer(poolArgs); poolServer.setServerEventHandler(serverEventHandler); } catch (TTransportException e) { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 2b2f558572670..6dbb05a5654ec 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -255,6 +255,11 @@ struct TSInsertRecordsReq { 6: optional bool isAligned } +struct TSInsertRecordsReqV2ColumnFormat { + 1: required i64 sessionId + 2: required binary buffer +} + struct TSInsertRecordsOfOneDeviceReq { 1: required i64 sessionId 2: required string prefixPath @@ -583,6 +588,8 @@ service IClientRPCService { common.TSStatus insertRecords(1:TSInsertRecordsReq req); + common.TSStatus insertRecordsV2ColumnFormat(1:TSInsertRecordsReqV2ColumnFormat req); + common.TSStatus insertRecordsOfOneDevice(1:TSInsertRecordsOfOneDeviceReq req); common.TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req);