Skip to content

Commit

Permalink
fix review and add some test
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Dec 27, 2019
1 parent 072877c commit 4d86120
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,18 @@ public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType,
if (tsFileResource.isClosed()) {
// get chunk metadata list of current closed tsfile
currentChunkMetaDataList = DeviceMetaDataCache.getInstance().get(tsFileResource, seriesPath);

// get modifications and apply to chunk metadatas
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
}
} else {
// metadata list of already flushed chunks
// metadata list of already flushed chunks in unsealed file, already applied modifications
currentChunkMetaDataList = tsFileResource.getChunkMetaDataList();
}

// get modifications and apply to chunk metadatas
List<Modification> pathModifications = context
.getPathModifications(tsFileResource.getModFile(), seriesPath.getFullPath());
if (!pathModifications.isEmpty()) {
QueryUtils.modifyChunkMetaData(currentChunkMetaDataList, pathModifications);
}

if (!currentChunkMetaDataList.isEmpty()) {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
.get(tsFileResource, tsFileResource.isClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@
import static org.junit.Assert.assertFalse;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.storageGroup.StorageGroupProcessorException;
import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.TimeValuePair;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.junit.After;
Expand Down Expand Up @@ -71,6 +77,43 @@ public void tearDown() throws Exception {
}


@Test
public void testUnseqUnsealedDelete() throws QueryProcessException, IOException {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
processor.waitForAllCurrentTsFileProcessorsClosed();


for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}

processor.getWorkUnSequenceTsFileProcessor().syncFlush();

for (int j = 11; j <= 20; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
}

processor.delete(deviceId, measurementId, 15L);

Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor.getWorkUnSequenceTsFileProcessor()
.query(deviceId, measurementId, TSDataType.INT32, Collections.emptyMap(), new QueryContext());

List<TimeValuePair> timeValuePairs = pair.left.getSortedTimeValuePairList();

long time = 16;
for (TimeValuePair timeValuePair : timeValuePairs) {
Assert.assertEquals(time++, timeValuePair.getTimestamp());
}

Assert.assertEquals(0, pair.right.size());
}

@Test
public void testSequenceSyncClose() throws QueryProcessException {
for (int j = 1; j <= 10; j++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.apache.iotdb.db.integration;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class IoTDBSimpleQueryTest {
private IoTDB deamon;

@Before
public void setUp() throws Exception {
deamon = IoTDB.getInstance();
deamon.active();
EnvironmentUtils.envSetUp();
}

@After
public void tearDown() throws Exception {
deamon.stop();
EnvironmentUtils.cleanEnv();
}

@Test
public void testUnseqUnsealedDeleteQuery() throws SQLException, ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try(Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
"root", "root");
Statement statement = connection.createStatement()){
statement.execute("SET STORAGE GROUP TO root.sg1");
statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT32,ENCODING=PLAIN");

// seq data
statement.execute("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (1000, 1)");
statement.execute("flush");

for (int i = 1; i <= 10; i++) {
statement.execute(
String.format("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (%d, %d)", i, i));
}

statement.execute("flush");

// unseq data
for (int i = 11; i <= 20; i++) {
statement.execute(
String.format("INSERT INTO root.sg1.d0(timestamp, s0) VALUES (%d, %d)", i, i));
}

statement.execute("delete from root.sg1.d0.s0 where time <= 15");

ResultSet resultSet = statement.executeQuery("select * from root");

long count = 0;

while(resultSet.next()) {
count++;
}

System.out.println(count);

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void test(TSDataType dataType) {
int index = 0;
long startTimestamp = System.currentTimeMillis();
BatchData data = pageReader.getAllSatisfiedPageData();
assert data != null;
Assert.assertNotNull(data);

while (data.hasCurrent()) {
Assert.assertEquals(Long.valueOf(index), (Long) data.currentTime());
Expand Down

0 comments on commit 4d86120

Please sign in to comment.