Skip to content

Commit

Permalink
use bloomfilter in TimeseriesMetadataCache
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Mar 30, 2020
1 parent e653ab3 commit 3481c3b
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 61 deletions.
Expand Up @@ -84,25 +84,25 @@ public static ChunkMetadataCache getInstance() {
/**
* get {@link ChunkMetadata}. THREAD SAFE.
*/
public List<ChunkMetadata> get(TsFileResource resource, Path seriesPath)
public List<ChunkMetadata> get(String filePath, Path seriesPath)
throws IOException {
if (!cacheEnable) {
TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(resource);
// bloom filter part
TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(filePath);
BloomFilter bloomFilter = fileMetaData.getBloomFilter();
if (bloomFilter != null && !bloomFilter.contains(seriesPath.getFullPath())) {
if (logger.isDebugEnabled()) {
logger.debug(String
.format("path not found by bloom filter, file is: %s, path is: %s", resource.getFile(), seriesPath));
.format("path not found by bloom filter, file is: %s, path is: %s", filePath, seriesPath));
}
return new ArrayList<>();
}
// If timeseries isn't included in the tsfile, empty list is returned.
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(resource.getPath(), true);
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(filePath, true);
return tsFileReader.getChunkMetadataList(seriesPath);
}

String key = (resource.getPath() + IoTDBConstant.PATH_SEPARATOR
String key = (filePath + IoTDBConstant.PATH_SEPARATOR
+ seriesPath.getDevice() + seriesPath.getMeasurement()).intern();

cacheRequestNum.incrementAndGet();
Expand All @@ -126,14 +126,14 @@ public List<ChunkMetadata> get(TsFileResource resource, Path seriesPath)
return new ArrayList<>(lruCache.get(key));
}
printCacheLog(false);
TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(resource);
// bloom filter part
TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(filePath);
BloomFilter bloomFilter = fileMetaData.getBloomFilter();
if (bloomFilter != null && !bloomFilter.contains(seriesPath.getFullPath())) {
return new ArrayList<>();
}
List<ChunkMetadata> chunkMetaDataList = FileLoaderUtils
.getChunkMetadataList(seriesPath, resource);
.getChunkMetadataList(seriesPath, filePath);
lruCache.put(key, chunkMetaDataList);
return chunkMetaDataList;
} finally {
Expand Down
Expand Up @@ -20,13 +20,15 @@
package org.apache.iotdb.db.engine.cache;

import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -82,6 +84,13 @@ public static TimeSeriesMetadataCache getInstance() {

public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String> allSensors) throws IOException {
if (!cacheEnable) {
// bloom filter part
TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(key.filePath);
BloomFilter bloomFilter = fileMetaData.getBloomFilter();
if (bloomFilter != null && !bloomFilter
.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) {
return null;
}
TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
return reader.readDeviceMetadata(key.device).get(key.measurement);
}
Expand All @@ -107,6 +116,13 @@ public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String> allSen
return lruCache.get(key);
}
printCacheLog(false);
// bloom filter part
TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(key.filePath);
BloomFilter bloomFilter = fileMetaData.getBloomFilter();
if (bloomFilter != null && !bloomFilter
.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) {
return null;
}
TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
Map<String, TimeseriesMetadata> timeSeriesMetadataMap = reader.readDeviceMetadata(key.device);
TimeseriesMetadata res = timeSeriesMetadataMap.get(key.measurement);
Expand Down
Expand Up @@ -90,37 +90,36 @@ public static TsFileMetaDataCache getInstance() {
/**
* get the TsFileMetaData for given TsFile.
*
* @param tsFileResource -given TsFile
* @param filePath -given TsFile
*/
public TsFileMetadata get(TsFileResource tsFileResource) throws IOException {
public TsFileMetadata get(String filePath) throws IOException {
if (!cacheEnable) {
return FileLoaderUtils.getTsFileMetadata(tsFileResource);
return FileLoaderUtils.getTsFileMetadata(filePath);
}

String path = tsFileResource.getPath().intern();
cacheRequestNum.incrementAndGet();

lock.readLock().lock();
try {
if (cache.containsKey(path)) {
if (cache.containsKey(filePath)) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
return cache.get(path);
return cache.get(filePath);
}
} finally {
lock.readLock().unlock();
}

lock.writeLock().lock();
try {
if (cache.containsKey(tsFileResource.getPath())) {
if (cache.containsKey(filePath)) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
return cache.get(tsFileResource.getPath());
return cache.get(filePath);
}
printCacheLog(false);
TsFileMetadata fileMetaData = FileLoaderUtils.getTsFileMetadata(tsFileResource);
cache.put(tsFileResource.getPath(), fileMetaData);
TsFileMetadata fileMetaData = FileLoaderUtils.getTsFileMetadata(filePath);
cache.put(filePath, fileMetaData);
return fileMetaData;
} finally {
lock.writeLock().unlock();
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand Down
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;

import java.util.*;

public class RawDataQueryPlan extends QueryPlan {
Expand Down
Expand Up @@ -21,15 +21,13 @@

import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;

import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
Expand All @@ -45,7 +43,6 @@
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;

import java.io.IOException;
import java.util.*;

Expand All @@ -66,9 +63,8 @@ public LastQueryExecutor(LastQueryPlan lastQueryPlan) {
public QueryDataSet execute(QueryContext context)
throws StorageEngineException, IOException, QueryProcessException {

ListDataSet dataSet =
new ListDataSet(
Arrays.asList(new Path(COLUMN_TIMESERIES), new Path(COLUMN_VALUE)),
ListDataSet dataSet = new ListDataSet(
Arrays.asList(new Path(COLUMN_TIMESERIES), new Path(COLUMN_VALUE)),
Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));

for (int i = 0; i < selectedSeries.size(); i++) {
Expand Down Expand Up @@ -122,14 +118,12 @@ private TimeValuePair calculateLastPairForOneSeries(

if (!seqFileResources.isEmpty()) {
for (int i = seqFileResources.size() - 1; i >= 0; i--) {
List<ChunkMetadata> chunkMetadata =
FileLoaderUtils.loadChunkMetadataFromTsFileResource(
List<ChunkMetadata> chunkMetadata = FileLoaderUtils.loadChunkMetadataFromTsFileResource(
seqFileResources.get(i), seriesPath, context);
if (!chunkMetadata.isEmpty()) {
ChunkMetadata lastChunkMetaData = chunkMetadata.get(chunkMetadata.size() - 1);
Statistics chunkStatistics = lastChunkMetaData.getStatistics();
resultPair =
constructLastPair(
resultPair = constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
break;
}
Expand All @@ -147,8 +141,7 @@ private TimeValuePair calculateLastPairForOneSeries(
if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
&& chunkMetaData.getVersion() > version) {
Statistics chunkStatistics = chunkMetaData.getStatistics();
resultPair =
constructLastPair(
resultPair = constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
version = chunkMetaData.getVersion();
}
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;

import java.io.IOException;
import java.util.List;

Expand All @@ -51,7 +50,7 @@ public DiskChunkMetadataLoader(TsFileResource resource, Path seriesPath, QueryCo
@Override
public List<ChunkMetadata> getChunkMetadataList() throws IOException {
List<ChunkMetadata> chunkMetadataList = ChunkMetadataCache
.getInstance().get(resource, seriesPath);
.getInstance().get(resource.getPath(), seriesPath);

setDiskChunkLoader(chunkMetadataList, resource, seriesPath, context);

Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;

import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -128,7 +127,7 @@ public static List<ChunkMetadata> loadChunkMetadataFromTsFileResource(
return new ArrayList<>();
}
if (resource.isClosed()) {
chunkMetadataList = ChunkMetadataCache.getInstance().get(resource, seriesPath);
chunkMetadataList = ChunkMetadataCache.getInstance().get(resource.getPath(), seriesPath);
} else {
chunkMetadataList = resource.getChunkMetadataList();
}
Expand Down Expand Up @@ -161,19 +160,13 @@ public static List<ChunkMetadata> loadChunkMetadataFromTsFileResource(
return chunkMetadataList;
}

public static List<ChunkMetadata> getChunkMetadataList(Path path, TsFileResource resource) throws IOException {
if (!resource.isClosed()) {
throw new IOException("The TsFile is not closed: " + resource.getFile().getAbsolutePath());
}
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(resource.getPath(), true);
public static List<ChunkMetadata> getChunkMetadataList(Path path, String filePath) throws IOException {
TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(filePath, true);
return tsFileReader.getChunkMetadataList(path);
}

public static TsFileMetadata getTsFileMetadata(TsFileResource resource) throws IOException {
if (!resource.isClosed()) {
throw new IOException("The TsFile is not closed: " + resource.getFile().getAbsolutePath());
}
TsFileSequenceReader reader = FileReaderManager.getInstance().get(resource.getPath(), true);
public static TsFileMetadata getTsFileMetadata(String filePath) throws IOException {
TsFileSequenceReader reader = FileReaderManager.getInstance().get(filePath, true);
return reader.readFileMetadata();
}
}
Expand Up @@ -67,16 +67,6 @@ public void setUp() throws Exception {
MetadataManagerHelper.initMetadata();
ActiveTimeSeriesCounter.getInstance().init(storageGroup);
storageGroupProcessor = new StorageGroupProcessor(systemDir, storageGroup, new DirectFlushPolicy());
// MManager.getInstance().createTimeseries(storageGroup + "." + measurementId0,
// new MeasurementSchema(measurementId0, TSDataType.INT32, TSEncoding.PLAIN));
// MManager.getInstance().createTimeseries(storageGroup + "." + measurementId0,
// new MeasurementSchema(measurementId0, TSDataType.INT64, TSEncoding.PLAIN));
// MManager.getInstance().createTimeseries(storageGroup + "." + measurementId0,
// new MeasurementSchema(measurementId0, TSDataType.FLOAT, TSEncoding.PLAIN));
// MManager.getInstance().createTimeseries(storageGroup + "." + measurementId0,
// new MeasurementSchema(measurementId0, TSDataType.DOUBLE, TSEncoding.PLAIN));
// MManager.getInstance().createTimeseries(storageGroup + "." + measurementId0,
// new MeasurementSchema(measurementId0, TSDataType.BOOLEAN, TSEncoding.PLAIN));
insertData();
}

Expand Down Expand Up @@ -145,7 +135,7 @@ public void test1() throws IOException, QueryProcessException {
Assert.assertFalse(unseqResources.get(3).isClosed());

List<ChunkMetadata> metaDataList = ChunkMetadataCache.getInstance()
.get(seqResources.get(0), new Path(storageGroup, measurementId5));
.get(seqResources.get(0).getPath(), new Path(storageGroup, measurementId5));
Assert.assertEquals(0, metaDataList.size());
}

Expand All @@ -167,7 +157,7 @@ public void test2() throws IOException, QueryProcessException {
Assert.assertFalse(unseqResources.get(3).isClosed());

List<ChunkMetadata> metaDataList = ChunkMetadataCache.getInstance()
.get(seqResources.get(0), new Path(storageGroup, measurementId5));
.get(seqResources.get(0).getPath(), new Path(storageGroup, measurementId5));
Assert.assertEquals(0, metaDataList.size());
}

Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.iotdb.tsfile.utils.BloomFilter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
Expand Down

0 comments on commit 3481c3b

Please sign in to comment.