Skip to content

Commit

Permalink
Merge bbf28dc into 855fef4
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 committed Feb 17, 2020
2 parents 855fef4 + bbf28dc commit e7f9313
Show file tree
Hide file tree
Showing 23 changed files with 279 additions and 101 deletions.
6 changes: 3 additions & 3 deletions server/src/assembly/resources/conf/iotdb-engine.properties
Expand Up @@ -278,9 +278,9 @@ chunk_merge_point_threshold=20480

# whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
meta_data_cache_enable=true
# Read memory Allocation Ratio: FileMetaDataCache, ChunkMetaDataCache, and Free Memory Used in Query.
# The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 3:6:10
filemeta_chunkmeta_free_memory_proportion=3:6:10
# Read memory Allocation Ratio: FileMetaDataCache, ChunkMetaDataCache, ChunkCache and Free Memory Used in Query.
# The parameter form is a:b:c:d, where a, b, c and d are integers. for example: 1:1:1:1 , 3:6:10:20
filemeta_chunkmeta_chunk_free_memory_proportion=3:6:10:20


####################
Expand Down
25 changes: 19 additions & 6 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Expand Up @@ -188,12 +188,17 @@ public class IoTDBConfig {
/**
* Memory allocated for fileMetaData cache in read process
*/
private long allocateMemoryForFileMetaDataCache = allocateMemoryForRead * 3 / 19;
private long allocateMemoryForFileMetaDataCache = allocateMemoryForRead * 3 / 39;

/**
* Memory allocated for chunkMetaData cache in read process
*/
private long allocateMemoryForChumkMetaDataCache = allocateMemoryForRead * 6 / 19;
private long allocateMemoryForChunkMetaDataCache = allocateMemoryForRead * 6 / 39;

/**
* Memory allocated for chunk cache in read process
*/
private long allocateMemoryForChunkCache = allocateMemoryForRead * 10 / 39;

/**
* The statMonitor writes statistics info into IoTDB every backLoopPeriodSec secs. The default
Expand Down Expand Up @@ -1047,12 +1052,20 @@ public void setAllocateMemoryForFileMetaDataCache(long allocateMemoryForFileMeta
this.allocateMemoryForFileMetaDataCache = allocateMemoryForFileMetaDataCache;
}

public long getAllocateMemoryForChumkMetaDataCache() {
return allocateMemoryForChumkMetaDataCache;
public long getAllocateMemoryForChunkMetaDataCache() {
return allocateMemoryForChunkMetaDataCache;
}

public void setAllocateMemoryForChunkMetaDataCache(long allocateMemoryForChunkMetaDataCache) {
this.allocateMemoryForChunkMetaDataCache = allocateMemoryForChunkMetaDataCache;
}

public long getAllocateMemoryForChunkCache() {
return allocateMemoryForChunkCache;
}

public void setAllocateMemoryForChumkMetaDataCache(long allocateMemoryForChumkMetaDataCache) {
this.allocateMemoryForChumkMetaDataCache = allocateMemoryForChumkMetaDataCache;
public void setAllocateMemoryForChunkCache(long allocateMemoryForChunkCache) {
this.allocateMemoryForChunkCache = allocateMemoryForChunkCache;
}

public boolean isEnableWatermark() {
Expand Down
Expand Up @@ -521,7 +521,7 @@ private void initMemoryAllocate(Properties properties) {
}

String queryMemoryAllocateProportion = properties
.getProperty("filemeta_chunkmeta_free_memory_proportion");
.getProperty("filemeta_chunkmeta_chunk_free_memory_proportion");
if (queryMemoryAllocateProportion != null) {
String[] proportions = queryMemoryAllocateProportion.split(":");
int proportionSum = 0;
Expand All @@ -532,8 +532,10 @@ private void initMemoryAllocate(Properties properties) {
try {
conf.setAllocateMemoryForFileMetaDataCache(
maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) / proportionSum);
conf.setAllocateMemoryForChumkMetaDataCache(
conf.setAllocateMemoryForChunkMetaDataCache(
maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) / proportionSum);
conf.setAllocateMemoryForChunkCache(
maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) / proportionSum);
} catch (Exception e) {
throw new RuntimeException(
"Each subsection of configuration item filemeta_chunkmeta_free_memory_proportion should be an integer, which is "
Expand Down
Expand Up @@ -66,6 +66,11 @@ public double getTsfileMetaDataHitRatio() {
return tsfileMetaDataHitRatio;
}

@Override
public double getChunkHitRatio() {
return ChunkCache.getInstance().calculateChunkHitRatio();
}

public static CacheHitRatioMonitor getInstance() {
return instance;
}
Expand Down
Expand Up @@ -23,4 +23,6 @@ public interface CacheHitRatioMonitorMXBean {
double getChunkMetaDataHitRatio();

double getTsfileMetaDataHitRatio();

double getChunkHitRatio();
}
150 changes: 150 additions & 0 deletions server/src/main/java/org/apache/iotdb/db/engine/cache/ChunkCache.java
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.engine.cache;

import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* This class is used to cache <code>Chunk</code> of <code>ChunkMetaData</code> in IoTDB. The caching
* strategy is LRU.
*/
public class ChunkCache {

private static final Logger logger = LoggerFactory.getLogger(ChunkCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long MEMORY_THRESHOLD_IN_CHUNK_CACHE = config.getAllocateMemoryForChunkCache();
private static boolean cacheEnable = config.isMetaDataCacheEnable();

private final LRULinkedHashMap<ChunkMetaData, Chunk> lruCache;

private AtomicLong cacheHitNum = new AtomicLong();
private AtomicLong cacheRequestNum = new AtomicLong();

private final ReadWriteLock lock = new ReentrantReadWriteLock();


private ChunkCache() {
lruCache = new LRULinkedHashMap<ChunkMetaData, Chunk>(MEMORY_THRESHOLD_IN_CHUNK_CACHE, true) {
@Override
protected long calEntrySize(ChunkMetaData key, Chunk value) {
return RamUsageEstimator.shallowSizeOf(key) + RamUsageEstimator.sizeOf(value);
}
};
}

public static ChunkCache getInstance() {
return ChunkCacheHolder.INSTANCE;
}

public Chunk get(ChunkMetaData chunkMetaData, TsFileSequenceReader reader) throws IOException {
if (!cacheEnable) {
Chunk chunk = reader.readMemChunk(chunkMetaData);
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
}

cacheRequestNum.incrementAndGet();

try {
lock.readLock().lock();
if (lruCache.containsKey(chunkMetaData)) {
cacheHitNum.incrementAndGet();
printCacheLog(true);
Chunk chunk = lruCache.get(chunkMetaData);
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
}
} finally {
lock.readLock().unlock();
}

lock.writeLock().lock();
if (lruCache.containsKey(chunkMetaData)) {
lock.readLock().lock();
lock.writeLock().unlock();
cacheHitNum.incrementAndGet();
printCacheLog(true);
Chunk chunk = lruCache.get(chunkMetaData);
lock.readLock().unlock();
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());
}
printCacheLog(false);
Chunk chunk = reader.readMemChunk(chunkMetaData);
lruCache.put(chunkMetaData, chunk);
lock.writeLock().unlock();
return new Chunk(chunk.getHeader(), chunk.getData().duplicate(), chunk.getDeletedAt(), reader.getEndianType());

}

private void printCacheLog(boolean isHit) {
if (!logger.isDebugEnabled()) {
return;
}
logger.debug(
"[ChunkMetaData cache {}hit] The number of requests for cache is {}, hit rate is {}.",
isHit ? "" : "didn't ", cacheRequestNum.get(),
cacheHitNum.get() * 1.0 / cacheRequestNum.get());
}

public double calculateChunkHitRatio() {
if (cacheRequestNum.get() != 0) {
return cacheHitNum.get() * 1.0 / cacheRequestNum.get();
} else {
return 0;
}
}


/**
* clear LRUCache.
*/
public void clear() {
lock.writeLock().lock();
if (lruCache != null) {
lruCache.clear();
}
lock.writeLock().unlock();
}

public void remove(ChunkMetaData chunkMetaData) {
lock.writeLock().lock();
if (chunkMetaData != null) {
lruCache.remove(chunkMetaData);
}
lock.writeLock().unlock();
}

/**
* singleton pattern.
*/
private static class ChunkCacheHolder {

private static final ChunkCache INSTANCE = new ChunkCache();
}
}
Expand Up @@ -18,13 +18,6 @@
*/
package org.apache.iotdb.db.engine.cache;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
Expand All @@ -37,6 +30,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class is used to cache <code>List<ChunkMetaData></code> of tsfile in IoTDB. The caching
* strategy is LRU.
Expand All @@ -45,7 +42,7 @@ public class DeviceMetaDataCache {

private static final Logger logger = LoggerFactory.getLogger(DeviceMetaDataCache.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long MEMORY_THRESHOLD_IN_B = config.getAllocateMemoryForChumkMetaDataCache();
private static final long MEMORY_THRESHOLD_IN_B = config.getAllocateMemoryForChunkMetaDataCache();
private static StorageEngine storageEngine = StorageEngine.getInstance();
private static boolean cacheEnable = config.isMetaDataCacheEnable();
/**
Expand Down
Expand Up @@ -245,8 +245,8 @@ private RamUsageEstimator() {
// Java 7+, HotSpot
try {
hotSpotBean = ManagementFactory.class
.getMethod("getPlatformMXBean", Class.class)
.invoke(null, beanClazz);
.getMethod("getPlatformMXBean", Class.class)
.invoke(null, beanClazz);
} catch (Exception e1) {
// Java 6, HotSpot
try {
Expand All @@ -255,15 +255,15 @@ private RamUsageEstimator() {
} catch (Exception e2) {
// Last resort option is an attempt to get it from ManagementFactory's server anyway (may start AWT).
hotSpotBean = ManagementFactory.newPlatformMXBeanProxy(
ManagementFactory.getPlatformMBeanServer(),
"com.sun.management:type=HotSpotDiagnostic", beanClazz);
ManagementFactory.getPlatformMBeanServer(),
"com.sun.management:type=HotSpotDiagnostic", beanClazz);
}
}
if (hotSpotBean != null) {
final Method getVMOptionMethod = beanClazz.getMethod("getVMOption", String.class);
final Object vmOption = getVMOptionMethod.invoke(hotSpotBean, "ObjectAlignmentInBytes");
objectAlignment = Integer.parseInt(
vmOption.getClass().getMethod("getValue").invoke(vmOption).toString()
vmOption.getClass().getMethod("getValue").invoke(vmOption).toString()
);
supportedFeatures.add(JvmFeature.OBJECT_ALIGNMENT);
}
Expand All @@ -274,8 +274,8 @@ private RamUsageEstimator() {
NUM_BYTES_OBJECT_ALIGNMENT = objectAlignment;

JVM_INFO_STRING = "[JVM: " +
Constants.JVM_NAME + ", " + Constants.JVM_VERSION + ", " + Constants.JVM_VENDOR + ", " +
Constants.JAVA_VENDOR + ", " + Constants.JAVA_VERSION + "]";
Constants.JVM_NAME + ", " + Constants.JVM_VERSION + ", " + Constants.JVM_VENDOR + ", " +
Constants.JAVA_VENDOR + ", " + Constants.JAVA_VERSION + "]";
}

/**
Expand Down Expand Up @@ -565,8 +565,8 @@ private static ClassCache createCacheEntry(final Class<?> clazz) {
}

cachedInfo = new ClassCache(
alignObjectSize(shallowInstanceSize),
referenceFields.toArray(new Field[referenceFields.size()]));
alignObjectSize(shallowInstanceSize),
referenceFields.toArray(new Field[referenceFields.size()]));
return cachedInfo;
}

Expand All @@ -583,7 +583,7 @@ private static long adjustForField(long sizeSoFar, final Field f) {
if (objectFieldOffsetMethod != null) {
try {
final long offsetPlusSize =
((Number) objectFieldOffsetMethod.invoke(theUnsafe, f)).longValue() + fsize;
((Number) objectFieldOffsetMethod.invoke(theUnsafe, f)).longValue() + fsize;
return Math.max(sizeSoFar, offsetPlusSize);
} catch (IllegalAccessException ex) {
throw new RuntimeException("Access problem with sun.misc.Unsafe", ex);
Expand All @@ -598,8 +598,8 @@ private static long adjustForField(long sizeSoFar, final Field f) {
// this should never happen (Unsafe does not declare
// checked Exceptions for this method), but who knows?
throw new RuntimeException("Call to Unsafe's objectFieldOffset() throwed " +
"checked Exception when accessing field " +
f.getDeclaringClass().getName() + "#" + f.getName(), cause);
"checked Exception when accessing field " +
f.getDeclaringClass().getName() + "#" + f.getName(), cause);
}
} else {
// TODO: No alignments based on field type/ subclass fields alignments?
Expand Down Expand Up @@ -628,7 +628,7 @@ public static EnumSet<JvmFeature> getSupportedFeatures() {
*/
public static String humanReadableUnits(long bytes) {
return humanReadableUnits(bytes,
new DecimalFormat("0.#", DecimalFormatSymbols.getInstance(Locale.ROOT)));
new DecimalFormat("0.#", DecimalFormatSymbols.getInstance(Locale.ROOT)));
}

/**
Expand Down Expand Up @@ -716,7 +716,7 @@ public IdentityHashSet(int initialCapacity, float loadFactor) {
initialCapacity = Math.max(MIN_CAPACITY, initialCapacity);

assert initialCapacity > 0 : "Initial capacity must be between (0, "
+ Integer.MAX_VALUE + "].";
+ Integer.MAX_VALUE + "].";
assert loadFactor > 0 && loadFactor < 1 : "Load factor must be between (0, 1).";
this.loadFactor = loadFactor;
allocateBuffers(roundCapacity(initialCapacity));
Expand Down Expand Up @@ -824,7 +824,7 @@ private void allocateBuffers(int capacity) {
protected int nextCapacity(int current) {
assert current > 0 && Long.bitCount(current) == 1 : "Capacity must be a power of two.";
assert ((current << 1) > 0) : "Maximum capacity exceeded ("
+ (0x80000000 >>> 1) + ").";
+ (0x80000000 >>> 1) + ").";

if (current < MIN_CAPACITY / 2) {
current = MIN_CAPACITY / 2;
Expand Down Expand Up @@ -957,7 +957,7 @@ private Constants() {
*/
@Deprecated
public static final boolean JRE_IS_MINIMUM_JAVA6 =
new Boolean(true).booleanValue(); // prevent inlining in foreign class files
new Boolean(true).booleanValue(); // prevent inlining in foreign class files

public static final boolean JRE_IS_MINIMUM_JAVA7;
public static final boolean JRE_IS_MINIMUM_JAVA8;
Expand All @@ -975,7 +975,7 @@ private Constants() {
unsafeField.setAccessible(true);
final Object unsafe = unsafeField.get(null);
final int addressSize = ((Number) unsafeClass.getMethod("addressSize")
.invoke(unsafe)).intValue();
.invoke(unsafe)).intValue();
//System.out.println("Address size: " + addressSize);
is64Bit = addressSize >= 8;
} catch (Exception e) {
Expand Down

0 comments on commit e7f9313

Please sign in to comment.