Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private SchemaUtils() {}
Set<TSEncoding> intSet = new HashSet<>();
intSet.add(TSEncoding.PLAIN);
intSet.add(TSEncoding.RLE);
intSet.add(TSEncoding.DIFF);
intSet.add(TSEncoding.TS_2DIFF);
intSet.add(TSEncoding.GORILLA);
schemaChecker.put(TSDataType.INT32, intSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,63 @@ public void testSetTimeEncoderRegularAndValueEncoderTS_2DIFFOutofOrder() {
}
}

@Test
public void testSetTimeEncoderRegularAndValueEncoderDIFF() {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
statement.execute(
"CREATE TIMESERIES root.db_0.tab0.salary WITH DATATYPE=INT64,ENCODING=DIFF");
statement.execute("insert into root.db_0.tab0(time,salary) values(1,1100)");
statement.execute("insert into root.db_0.tab0(time,salary) values(2,1200)");
statement.execute("insert into root.db_0.tab0(time,salary) values(3,1300)");
statement.execute("insert into root.db_0.tab0(time,salary) values(4,1400)");

statement.execute("flush");

int[] result = new int[] {1100, 1200, 1300, 1400};
try (ResultSet resultSet = statement.executeQuery("select * from root.db_0.tab0")) {
int index = 0;
while (resultSet.next()) {
int salary = resultSet.getInt("root.db_0.tab0.salary");
assertEquals(result[index], salary);
index++;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testSetTimeEncoderRegularAndValueEncoderDIFFOutofOrder() {
try (Connection connection =
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
statement.execute(
"CREATE TIMESERIES root.db_0.tab0.salary WITH DATATYPE=INT32,ENCODING=DIFF");
statement.execute("insert into root.db_0.tab0(time,salary) values(1,1200)");
statement.execute("insert into root.db_0.tab0(time,salary) values(2,1100)");
statement.execute("insert into root.db_0.tab0(time,salary) values(7,1000)");
statement.execute("insert into root.db_0.tab0(time,salary) values(4,2200)");
statement.execute("flush");

int[] result = new int[] {1200, 1100, 2200, 1000};
try (ResultSet resultSet = statement.executeQuery("select * from root.db_0.tab0")) {
int index = 0;
while (resultSet.next()) {
int salary = resultSet.getInt("root.db_0.tab0.salary");
assertEquals(result[index], salary);
index++;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testSetTimeEncoderRegularAndValueEncoderRLE() {
try (Connection connection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ public static Decoder getDecoderByType(TSEncoding encoding, TSDataType dataType)
}
case DICTIONARY:
return new DictionaryDecoder();
case DIFF:
switch (dataType) {
case INT32:
return new DiffDecoder.IntDeltaDecoder();
case INT64:
case VECTOR:
return new DiffDecoder.LongDeltaDecoder();
default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
}
default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding, dataType));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.tsfile.encoding.decoder;

import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* This class is a decoder for decoding the byte array that encoded by {@code DiffEncoder}.
* DiffDecoder just supports integer and long values.<br>
*
* @see org.apache.iotdb.tsfile.encoding.encoder.DiffEncoder
*/
public abstract class DiffDecoder extends Decoder {

protected byte[] deltaBuf;

/** the first value in one pack. */
protected int readIntTotalCount = 0;

protected int nextReadIndex = 0;
/** max bit length of all value in a pack. */
protected int packWidth;
/** data number in this pack. */
protected int packNum;

/** how many bytes data takes after encoding. */
protected int encodingLength;

public DiffDecoder() {
super(TSEncoding.DIFF);
}

protected abstract void readHeader(ByteBuffer buffer) throws IOException;

protected abstract void allocateDataArray();

protected abstract void readValue(int i);

/**
* calculate the bytes length containing v bits.
*
* @param v - number of bits
* @return number of bytes
*/
protected int ceil(int v) {
return (int) Math.ceil((double) (v) / 8.0);
}

@Override
public boolean hasNext(ByteBuffer buffer) throws IOException {
return (nextReadIndex < readIntTotalCount) || buffer.remaining() > 0;
}

public static class IntDeltaDecoder extends DiffDecoder {

private int firstValue;
private int[] data;
private int previous;

public IntDeltaDecoder() {
super();
}

/**
* if there's no decoded data left, decode next pack into {@code data}.
*
* @param buffer ByteBuffer
* @return int
*/
protected int readT(ByteBuffer buffer) {
if (nextReadIndex == readIntTotalCount) {
return loadIntBatch(buffer);
}
return data[nextReadIndex++];
}

@Override
public int readInt(ByteBuffer buffer) {
return readT(buffer);
}

/**
* if remaining data has been run out, load next pack from InputStream.
*
* @param buffer ByteBuffer
* @return int
*/
protected int loadIntBatch(ByteBuffer buffer) {
packNum = ReadWriteIOUtils.readInt(buffer);
packWidth = ReadWriteIOUtils.readInt(buffer);

readHeader(buffer);

encodingLength = ceil(packNum * packWidth);
deltaBuf = new byte[encodingLength];
buffer.get(deltaBuf);
allocateDataArray();

previous = firstValue;
readIntTotalCount = packNum;
nextReadIndex = 0;
readPack();
return firstValue;
}

private void readPack() {
for (int i = 0; i < packNum; i++) {
readValue(i);
previous = data[i];
}
}

@Override
protected void readHeader(ByteBuffer buffer) {
firstValue = ReadWriteIOUtils.readInt(buffer);
}

@Override
protected void allocateDataArray() {
data = new int[packNum];
}

@Override
protected void readValue(int i) {
int v = BytesUtils.bytesToInt(deltaBuf, packWidth * i, packWidth);
data[i] = previous + v;
}

@Override
public void reset() {
// do nothing
}
}

public static class LongDeltaDecoder extends DiffDecoder {

private long firstValue;
private long[] data;
private long previous;

public LongDeltaDecoder() {
super();
}

/**
* if there's no decoded data left, decode next pack into {@code data}.
*
* @param buffer ByteBuffer
* @return long value
*/
protected long readT(ByteBuffer buffer) {
if (nextReadIndex == readIntTotalCount) {
return loadIntBatch(buffer);
}
return data[nextReadIndex++];
}

/**
* if remaining data has been run out, load next pack from InputStream.
*
* @param buffer ByteBuffer
* @return long value
*/
protected long loadIntBatch(ByteBuffer buffer) {
packNum = ReadWriteIOUtils.readInt(buffer);
packWidth = ReadWriteIOUtils.readInt(buffer);

readHeader(buffer);

encodingLength = ceil(packNum * packWidth);
deltaBuf = new byte[encodingLength];
buffer.get(deltaBuf);
allocateDataArray();

previous = firstValue;
readIntTotalCount = packNum;
nextReadIndex = 0;
readPack();
return firstValue;
}

private void readPack() {
for (int i = 0; i < packNum; i++) {
readValue(i);
previous = data[i];
}
}

@Override
public long readLong(ByteBuffer buffer) {

return readT(buffer);
}

@Override
protected void readHeader(ByteBuffer buffer) {
firstValue = ReadWriteIOUtils.readLong(buffer);
}

@Override
protected void allocateDataArray() {
data = new long[packNum];
}

@Override
protected void readValue(int i) {
long v = BytesUtils.bytesToLong(deltaBuf, packWidth * i, packWidth);
data[i] = previous + v;
}

@Override
public void reset() {
// do nothing
}
}
}
Loading