Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics;
Expand Down Expand Up @@ -98,7 +99,8 @@ public static void bind() {

// bind load related metrics
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
MetricService.getInstance().addMetricSet(ActiveLoadingFilesMetricsSet.getInstance());
MetricService.getInstance().addMetricSet(ActiveLoadingFilesNumberMetricsSet.getInstance());
MetricService.getInstance().addMetricSet(ActiveLoadingFilesSizeMetricsSet.getInstance());
}

private static void initSystemMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package org.apache.iotdb.db.storageengine.load.active;

import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;

import org.apache.commons.io.FileUtils;
import org.apache.tsfile.common.conf.TSFileConfig;
Expand Down Expand Up @@ -113,13 +114,15 @@ private void hotReloadActiveLoadDirs() {
}
}
}

// Hot reload active load listening dir for pipe data sync
// Active load is always enabled for pipe data sync
listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());

// Create directories if not exists
listeningDirs.forEach(this::createDirectoriesIfNotExists);

ActiveLoadingFilesNumberMetricsSet.getInstance().updatePendingDirList(listeningDirs);
ActiveLoadingFilesSizeMetricsSet.getInstance().updatePendingDirList(listeningDirs);
} catch (final Exception e) {
LOGGER.warn(
"Error occurred during hot reload active load dirs. "
Expand Down Expand Up @@ -147,23 +150,45 @@ private static String getTsFilePath(final String filePathWithResourceOrModsTail)

// Metrics
public long countAndReportActiveListeningDirsFileNumber() {
final long[] fileCount = {0};
long totalFileCount = 0;
long totalFileSize = 0;

try {
for (String dir : listeningDirs) {
for (final String dir : listeningDirs) {
final long[] fileCountInDir = {0};
final long[] fileSizeInDir = {0};

Files.walkFileTree(
new File(dir).toPath(),
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
fileCount[0]++;
fileCountInDir[0]++;
try {
fileSizeInDir[0] += file.toFile().length();
} catch (Exception e) {
LOGGER.debug("Failed to count active listening dirs file number.", e);
}
return FileVisitResult.CONTINUE;
}
});

ActiveLoadingFilesNumberMetricsSet.getInstance()
.updatePendingFileCounterInDir(dir, fileCountInDir[0]);
ActiveLoadingFilesSizeMetricsSet.getInstance()
.updatePendingFileCounterInDir(dir, fileSizeInDir[0]);

totalFileCount += fileCountInDir[0];
totalFileSize += fileSizeInDir[0];
}
ActiveLoadingFilesMetricsSet.getInstance().recordPendingFileCounter(fileCount[0]);

ActiveLoadingFilesNumberMetricsSet.getInstance()
.updateTotalPendingFileCounter(totalFileCount);
ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalPendingFileCounter(totalFileSize);
} catch (final IOException e) {
LOGGER.debug("Failed to count active listening dirs file number.", e);
}
return fileCount[0];

return totalFileCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iotdb.db.storageengine.load.active;

import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;

import org.apache.tsfile.utils.Pair;

Expand All @@ -39,7 +39,7 @@ public synchronized boolean enqueue(final String file, final boolean isGenerated
if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));

ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(1);
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
return true;
}
return false;
Expand All @@ -51,16 +51,16 @@ public synchronized Pair<String, Boolean> dequeueFromPending() {
pendingFileSet.remove(pair.left);
loadingFileSet.add(pair.left);

ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(1);
ActiveLoadingFilesMetricsSet.getInstance().recordQueuingFileCounter(-1);
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1);
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1);
}
return pair;
}

public synchronized void removeFromLoading(final String file) {
loadingFileSet.remove(file);

ActiveLoadingFilesMetricsSet.getInstance().recordLoadingFileCounter(-1);
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(-1);
}

public synchronized boolean isFilePendingOrLoading(final String file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -96,6 +97,9 @@ private void initFailDirIfNecessary() {
e);
}
failDir.set(IOTDB_CONFIG.getLoadActiveListeningFailDir());

ActiveLoadingFilesSizeMetricsSet.getInstance().updateFailedDir(failDir.get());
ActiveLoadingFilesNumberMetricsSet.getInstance().updateFailedDir(failDir.get());
}
}
}
Expand Down Expand Up @@ -262,6 +266,8 @@ public boolean isFilePendingOrLoading(final String filePath) {
// Metrics
public long countAndReportFailedFileNumber() {
final long[] fileCount = {0};
final long[] fileSize = {0};

try {
initFailDirIfNecessary();
Files.walkFileTree(
Expand All @@ -270,13 +276,21 @@ public long countAndReportFailedFileNumber() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
fileCount[0]++;
try {
fileSize[0] += file.toFile().length();
} catch (Exception e) {
LOGGER.debug("Failed to count failed files in fail directory.", e);
}
return FileVisitResult.CONTINUE;
}
});
ActiveLoadingFilesMetricsSet.getInstance().recordFailedFileCounter(fileCount[0]);

ActiveLoadingFilesNumberMetricsSet.getInstance().updateTotalFailedFileCounter(fileCount[0]);
ActiveLoadingFilesSizeMetricsSet.getInstance().updateTotalFailedFileCounter(fileSize[0]);
} catch (final IOException e) {
LOGGER.warn("Failed to count failed files in fail directory.", e);
LOGGER.debug("Failed to count failed files in fail directory.", e);
}

return fileCount[0];
}
}
Loading