Skip to content
Permalink
Browse files
Merge pull request #560 from apache/dynamic_config
[IOTDB-247] Optimize Dynamic Config with ActiveTimeSeriesCounter
  • Loading branch information
liuruiyiyang committed Nov 21, 2019
2 parents 85d3bf9 + 8e7861f commit def4daf2addef0062fe04f698a825301ffa343bc
Showing 18 changed files with 452 additions and 38 deletions.
@@ -116,6 +116,11 @@
<artifactId>jfreechart</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.9.5</version>
</dependency>
</dependencies>
<build>
<plugins>
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf.adapter;

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActiveTimeSeriesCounter implements IActiveTimeSeriesCounter {

private static final Logger LOGGER = LoggerFactory.getLogger(ActiveTimeSeriesCounter.class);
/**
* Map[StorageGroup, HyperLogLogCounter]
*/
private static Map<String, HyperLogLog> storageGroupHllMap = new ConcurrentHashMap<>();

/**
* Map[StorageGroup, ActiveTimeSeriesRatio]
*/
private static Map<String, Double> activeRatioMap = new ConcurrentHashMap<>();

/**
* Map[StorageGroup, ActiveTimeSeriesNumber]
*/
private static Map<String, Long> activeTimeSeriesNumMap = new ConcurrentHashMap<>();

/**
* LOG2M decide the precision of the HyperLogLog algorithm
*/
static final int LOG2M = 13;

private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

@Override
public void init(String storageGroup) {
storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
activeRatioMap.put(storageGroup, 0D);
activeTimeSeriesNumMap.put(storageGroup, 0L);
}

@Override
public void offer(String storageGroup, String device, String measurement) {
String path = device + IoTDBConstant.PATH_SEPARATOR + measurement;
try {
storageGroupHllMap.get(storageGroup).offer(path);
} catch (Exception e) {
storageGroupHllMap.putIfAbsent(storageGroup, new HyperLogLog(LOG2M));
storageGroupHllMap.get(storageGroup).offer(path);
LOGGER.error("Storage group {} registers active time series {} failed", storageGroup, path,
e);
}
}

@Override
public void updateActiveRatio(String storageGroup) {
lock.writeLock().lock();
try {
long activeTimeSeriesNum = storageGroupHllMap.get(storageGroup).cardinality();
if (activeTimeSeriesNum != activeTimeSeriesNumMap.get(storageGroup)) {
// update the active time series number in the newest memtable to be flushed
activeTimeSeriesNumMap.put(storageGroup, activeTimeSeriesNum);

double totalActiveTsNum = 0;
LOGGER.debug("{}: updating active ratio", Thread.currentThread().getName());
for (double number : activeTimeSeriesNumMap.values()) {
totalActiveTsNum += number;
}
for (Map.Entry<String, Long> entry : activeTimeSeriesNumMap.entrySet()) {
double activeRatio = 0;
if (totalActiveTsNum > 0) {
activeRatio = entry.getValue() / totalActiveTsNum;
}
activeRatioMap.put(entry.getKey(), activeRatio);
LOGGER.debug("{}: storage group {} has an active ratio: {}",
Thread.currentThread().getName(),
entry.getKey(), activeRatio);
}
}
// initialize the HLL counter
storageGroupHllMap.put(storageGroup, new HyperLogLog(LOG2M));
} catch (Exception e) {
LOGGER.error("Update {} active ratio failed", storageGroup, e);
} finally {
lock.writeLock().unlock();
}
}

@Override
public double getActiveRatio(String storageGroup) {
lock.writeLock().lock();
double ratio;
try {
ratio = activeRatioMap.get(storageGroup);
} catch (Exception e) {
ratio = 0;
LOGGER.error("Get active ratio failed", e);
} finally {
lock.writeLock().unlock();
}
return ratio;
}

@Override
public void delete(String storageGroup) {
storageGroupHllMap.remove(storageGroup);
activeRatioMap.remove(storageGroup);
activeTimeSeriesNumMap.remove(storageGroup);
}

private static class ActiveTimeSeriesCounterHolder {
private static final ActiveTimeSeriesCounter INSTANCE = new ActiveTimeSeriesCounter();
}

public static ActiveTimeSeriesCounter getInstance() {
return ActiveTimeSeriesCounterHolder.INSTANCE;
}

/**
* this method is for test
*/
public static void clear() {
storageGroupHllMap = new ConcurrentHashMap<>();
activeRatioMap = new ConcurrentHashMap<>();
activeTimeSeriesNumMap = new ConcurrentHashMap<>();
}
}
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.conf.adapter;

public interface IActiveTimeSeriesCounter {

/**
* Initialize the counter by adding a new HyperLogLog counter for the given storage group
*
* @param storageGroup the given storage group to be initialized
*/
void init(String storageGroup);

/**
* Register a time series to the active time series counter
*
* @param storageGroup the storage group name of the time series
* @param device the device name of the time series
* @param measurement the sensor name of the time series
*/
void offer(String storageGroup, String device, String measurement);

/**
* Update the ActiveRatioMap
*
* @param storageGroup whose counter will be refreshed after the update
*/
void updateActiveRatio(String storageGroup);

/**
* Get the active time series number proportion of the given storage group
*
* @param storageGroup the storage group to be calculated
* @return the active time series number proportion of the given storage group
*/
double getActiveRatio(String storageGroup);

/**
* Delete the counter for the given storage group
*
* @param storageGroup whose counter will be removed
*/
void delete(String storageGroup);

}
@@ -18,6 +18,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.IWritableMemChunk;
@@ -78,9 +79,12 @@ public void syncFlushMemTable() throws ExecutionException, InterruptedException
TVList tvList = series.getSortedTVList();
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc));
// register active time series to the ActiveTimeSeriesCounter
ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId);
}
encodingTaskQueue.add(new EndChunkGroupIoTask(memTable.getVersion()));
}
ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
noMoreEncodingTask = true;
logger.debug(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
@@ -32,6 +32,7 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.conf.adapter.CompressionRatio;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -237,29 +238,15 @@ boolean shouldFlush() {
* <p>In the dynamic parameter adjustment module{@link IoTDBConfigDynamicAdapter}, it calculated
* the average size of each metatable{@link IoTDBConfigDynamicAdapter#tryToAdaptParameters()}.
* However, considering that the number of timeseries between storage groups may vary greatly,
* it's appropriate to judge whether to flush the memtable according to the average memtable size.
* We need to adjust it according to the number of timeseries in a specific storage group.
*
* Abbreviation of parameters:
*
* 1 memtableSize: m
* 2 maxMemTableNum: Nm
* 3 SeriesNumber: Ns
* 4 chunkSizeThreshold: Sc
* 5 Total timeseries number: Nts {@link IoTDBConfigDynamicAdapter#totalTimeseries}
* 6 MemTable Number for Each SG: Nmes {@link IoTDBConstant#MEMTABLE_NUM_IN_EACH_STORAGE_GROUP}
*
* <p>The equation: Σ(Ns * Sc) * Nmes = m * Nm ==> Σ(Ns) * Sc * Nmes = m * Nm ==> Sc = m * Nm / Nmes / Nts
* <p>Note: Σ means the sum of storage groups , so Nts = ΣNs
* it's inappropriate to judge whether to flush the memtable according to the average memtable
* size. We need to adjust it according to the number of timeseries in a specific storage group.
*
*/
private long getMemtableSizeThresholdBasedOnSeriesNum() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
return IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries() == 0 ? config
.getMemtableSizeThreshold() :
config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
/ IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP / IoTDBConfigDynamicAdapter.getInstance()
.getTotalTimeseries() * MManager.getInstance().getSeriesNumber(storageGroupName);
long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
/ IoTDBConstant.MEMTABLE_NUM_IN_EACH_STORAGE_GROUP * ActiveTimeSeriesCounter.getInstance().getActiveRatio(storageGroupName));
return Math.max(memTableSize, config.getMemtableSizeThreshold());
}


@@ -36,6 +36,7 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
@@ -596,6 +597,7 @@ public void setStorageGroupToMTree(String path) throws MetadataException {
}
mgraph.setStorageGroup(path);
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
ActiveTimeSeriesCounter.getInstance().init(path);
seriesNumberInStorageGroups.put(path, 0);
if (writeToLog) {
BufferedWriter writer = getLogWriter();
@@ -646,6 +648,7 @@ public boolean deleteStorageGroupsFromMTree(List<Path> deletePathList)
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
mgraph.deleteStorageGroup(delStorageGroup);
seriesNumberInStorageGroups.remove(delStorageGroup);
ActiveTimeSeriesCounter.getInstance().delete(delStorageGroup);
} catch (PathException e) {
try {
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
@@ -82,13 +82,6 @@ private void setUp() throws StartupException {
Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook());
setUncaughtExceptionHandler();

// When registering statMonitor, we should start recovering some statistics
// with latest values stored
// Warn: registMonitor() method should be called after systemDataRecovery()
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().recovery();
}

initMManager();
registerManager.register(StorageEngine.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
@@ -107,6 +100,13 @@ private void setUp() throws StartupException {
registerManager.register(MetricsService.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);

// When registering statMonitor, we should start recovering some statistics
// with latest values stored
// Warn: registMonitor() method should be called after systemDataRecovery()
if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) {
StatMonitor.getInstance().recovery();
}

logger.info("IoTDB is set up.");
}

0 comments on commit def4daf

Please sign in to comment.