Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-144]meta data cache for query #262

Merged
merged 8 commits into from Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
Expand Down Expand Up @@ -241,6 +242,15 @@ public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryConte
return storageGroupProcessor.query(deviceId, measurementId, context);
}

/**
* returns the top k% measurements that are most frequently used in queries.
*/
public Set calTopKMeasurement(String deviceId, String sensorId, double k)
throws StorageEngineException {
StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId);
return storageGroupProcessor.calTopKMeasurement(sensorId, k);
}

/**
* Append one specified tsfile to the storage group. <b>This method is only provided for
* transmission module</b>
Expand Down
Expand Up @@ -19,77 +19,144 @@
package org.apache.iotdb.db.engine.cache;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
import org.apache.iotdb.tsfile.read.common.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is used to cache <code>DeviceMetaDataCache</code> of tsfile in IoTDB.
* This class is used to cache <code>List<ChunkMetaData></code> of tsfile in IoTDB. The caching
* strategy is LRU.
*/
public class DeviceMetaDataCache {

private static final Logger logger = LoggerFactory.getLogger(DeviceMetaDataCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

private static final int CACHE_SIZE = 100;
private static StorageEngine storageEngine = StorageEngine.getInstance();

private static final long MEMORY_THRESHOLD_IN_B = (long) (0.3 * config
.getAllocateMemoryForRead());
/**
* key: the file path + deviceId.
* key: file path dot deviceId dot sensorId.
* <p>
* value: chunkMetaData list of one timeseries in the file.
*/
private LinkedHashMap<String, TsDeviceMetadata> lruCache;
private LruLinkedHashMap<String, List<ChunkMetaData>> lruCache;

private AtomicLong cacheHintNum = new AtomicLong();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hit or hint?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hit.It's a typo.

private AtomicLong cacheRequestNum = new AtomicLong();

private DeviceMetaDataCache(int cacheSize) {
lruCache = new LruLinkedHashMap(cacheSize, true);
/**
* approximate estimation of chunkMetaData size
*/
private long chunkMetaDataSize = 0;

private DeviceMetaDataCache(long memoryThreshold) {
lruCache = new LruLinkedHashMap<String, List<ChunkMetaData>>(memoryThreshold, true) {
@Override
protected long calEntrySize(String key, List<ChunkMetaData> value) {
if (chunkMetaDataSize == 0 && !value.isEmpty()) {
chunkMetaDataSize = RamUsageEstimator.sizeOf(value.get(0));
}
return value.size() * chunkMetaDataSize + key.length() * 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why +key.length() * 2 here?
It is ChunkMetaData not TsFileMetaData.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mistake. It is right.

}
};
}

public static DeviceMetaDataCache getInstance() {
return RowGroupBlockMetaDataCacheSingleton.INSTANCE;
}

/**
* get {@link TsDeviceMetadata}. THREAD SAFE.
* get {@link ChunkMetaData}. THREAD SAFE.
*/
public TsDeviceMetadata get(String filePath, String deviceId, TsFileMetaData fileMetaData)
public List<ChunkMetaData> get(String filePath, Path seriesPath)
throws IOException {
// The key(the tsfile path and deviceId) for the lruCache
StringBuilder builder = new StringBuilder(filePath).append(".").append(seriesPath.getDevice());
String pathDeviceStr = builder.toString();
String key = builder.append(".").append(seriesPath.getMeasurement()).toString();
Object devicePathObject = pathDeviceStr.intern();

String jointPath = filePath + deviceId;
Object jointPathObject = jointPath.intern();
synchronized (lruCache) {
cacheRequestNum.incrementAndGet();
if (lruCache.containsKey(jointPath)) {
if (lruCache.containsKey(key)) {
cacheHintNum.incrementAndGet();
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
if (logger.isDebugEnabled()) {
logger.debug(
"Cache hint: the number of requests for cache is {}, "
"Cache hit: the number of requests for cache is {}, "
+ "the number of hints for cache is {}",
cacheRequestNum.get(), cacheHintNum.get());
}
return lruCache.get(jointPath);
return new ArrayList<>(lruCache.get(key));
}
}
synchronized (jointPathObject) {
synchronized (devicePathObject) {
synchronized (lruCache) {
if (lruCache.containsKey(jointPath)) {
return lruCache.get(jointPath);
if (lruCache.containsKey(key)) {
cacheHintNum.incrementAndGet();
return new ArrayList<>(lruCache.get(key));
}
}
if (logger.isDebugEnabled()) {
logger.debug("Cache didn't hint: the number of requests for cache is {}",
logger.debug("Cache didn't hit: the number of requests for cache is {}",
cacheRequestNum.get());
}
TsFileMetaData fileMetaData = TsFileMetaDataCache.getInstance().get(filePath);
TsDeviceMetadata blockMetaData = TsFileMetadataUtils
.getTsRowGroupBlockMetaData(filePath, deviceId,
.getTsRowGroupBlockMetaData(filePath, seriesPath.getDevice(),
fileMetaData);
Map<Path, List<ChunkMetaData>> chunkMetaData = TsFileMetadataUtils
.getChunkMetaDataList(calHotSensorSet(seriesPath), blockMetaData);
synchronized (lruCache) {
lruCache.put(jointPath, blockMetaData);
return lruCache.get(jointPath);
chunkMetaData.forEach((path, chunkMetaDataList) -> {
String k = pathDeviceStr + "." + path.getMeasurement();
if (!lruCache.containsKey(k)) {
lruCache.put(k, chunkMetaDataList);
}
});
if (chunkMetaData.containsKey(seriesPath)) {
return new ArrayList<>(chunkMetaData.get(seriesPath));
}
return new ArrayList<>();
}
}
}

/**
* calculate the most frequently query sensors set.
*
* @param seriesPath the series to be queried in a query statements.
*/
private Set<String> calHotSensorSet(Path seriesPath) throws IOException {
double usedMemProportion = lruCache.getUsedMemoryProportion();

if (usedMemProportion < 0.6) {
return new HashSet<>();
} else {
double hotSensorProportion;
if (usedMemProportion < 0.8) {
hotSensorProportion = 0.1;
} else {
hotSensorProportion = 0.05;
jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
}
try {
return storageEngine
.calTopKMeasurement(seriesPath.getDevice(), seriesPath.getMeasurement(),
hotSensorProportion);
} catch (Exception e) {
throw new IOException(e);
}
}
}
Expand All @@ -104,64 +171,11 @@ public void clear() {
}

/**
* the default LRU cache size is 100. The singleton pattern.
* singleton pattern.
*/
private static class RowGroupBlockMetaDataCacheSingleton {

private static final DeviceMetaDataCache INSTANCE = new
DeviceMetaDataCache(CACHE_SIZE);
}

/**
* This class is a map used to cache the <code>RowGroupBlockMetaData</code>. The caching strategy
* is LRU.
*
*/
private class LruLinkedHashMap extends LinkedHashMap<String, TsDeviceMetadata> {

private static final long serialVersionUID = 1290160928914532649L;
private static final float LOAD_FACTOR_MAP = 0.75f;
private int maxCapacity;

public LruLinkedHashMap(int maxCapacity, boolean isLru) {
super(maxCapacity, LOAD_FACTOR_MAP, isLru);
this.maxCapacity = maxCapacity;
}

@Override
protected boolean removeEldestEntry(Map.Entry<String, TsDeviceMetadata> eldest) {
return size() > maxCapacity;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return super.equals(o);
}

@Override
public int hashCode() {
return super.hashCode();
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeviceMetaDataCache that = (DeviceMetaDataCache) o;
return Objects.equals(lruCache, that.lruCache) &&
Objects.equals(cacheHintNum, that.cacheHintNum) &&
Objects.equals(cacheRequestNum, that.cacheRequestNum);
}

@Override
public int hashCode() {
return Objects.hash(lruCache, cacheHintNum, cacheRequestNum);
DeviceMetaDataCache(MEMORY_THRESHOLD_IN_B);
}
}
}
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.engine.cache;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* This class is an LRU cache. <b>Note: It's not thread safe.</b>
*/
public abstract class LruLinkedHashMap<K, V> extends LinkedHashMap<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public abstract class LruLinkedHashMap<K, V> extends LinkedHashMap<K, V> {
public abstract class LRULinkedHashMap<K, V> extends LinkedHashMap<K, V> {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@qiaojialin actually I don't like the function suggested change. Look here, you changed the class name but forget to change the file name, which Suyue doesn't notice too.
I personally prefer you just point out where the problem is instead of changing it for her.


private static final long serialVersionUID = 1290160928914532649L;
private static final float LOAD_FACTOR_MAP = 0.75f;
private static final int INITIAL_CAPACITY = 128;
/**
* maximum memory threshold.
*/
private long maxMemInB;
/**
* current used memory.
*/
private long usedMemInB;

public LruLinkedHashMap(long maxMemInB, boolean isLru) {
super(INITIAL_CAPACITY, LOAD_FACTOR_MAP, isLru);
this.maxMemInB = maxMemInB;
}

@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (usedMemInB > maxMemInB) {
usedMemInB -= calEntrySize(eldest.getKey(), eldest.getValue());
return true;
} else {
return false;
}
}

@Override
public V put(K key, V value) {
usedMemInB += calEntrySize(key, value);
return super.put(key, value);
}

/**
* approximately estimate the additional size of key and value.
*/
protected abstract long calEntrySize(K key, V value);

/**
* calculate the proportion of used memory.
*/
public double getUsedMemoryProportion() {
return usedMemInB * 1.0 / maxMemInB;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return super.equals(o);
}

@Override
public int hashCode() {
return super.hashCode();
}
}