Skip to content
Permalink
Browse files
[IOTDB-3407] Checkstyle: force to use safe thread schedule interface (#…
  • Loading branch information
ericpai committed Jun 8, 2022
1 parent 34c8b6f commit d19061563816dd1984f97bef5c8139fc1f570f9b
Showing 25 changed files with 246 additions and 102 deletions.
@@ -40,7 +40,25 @@
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
<module name="LineLength">
<property name="max" value="100"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
<module name="SuppressWarningsFilter" />
<module name="TreeWalker">
<module name="SuppressWarningsHolder" />
<!--ERROR severity rules, each Java file should obey this -->
<module name="AvoidStarImport">
<property name="severity" value="error"/>
</module>
<module name="RegexpSinglelineJava">
<property name="id" value="unsafeThreadSchedule"/>
<property name="format" value="schedule(AtFixedRate|WithFixedDelay)\("/>
<property name="severity" value="error"/>
<property name="ignoreComments" value="true"/>
<property name="message" value="Using ScheduledExecutorService::schedule(AtFixedRate|WithFixedDelay) directly is unsafe, please use ScheduledExecutorUtil::safelySchedule(AtFixedRate|WithFixedDelay) instead."/>
</module>
<!--WARNING severity rules, which may be promoted to error level-->
<module name="OuterTypeFilename"/>
<module name="IllegalTokenText">
<property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
@@ -52,13 +70,6 @@
<property name="allowByTailComment" value="true"/>
<property name="allowNonPrintableEscapes" value="true"/>
</module>
<module name="LineLength">
<property name="max" value="100"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
<module name="AvoidStarImport">
<property name="severity" value="error"/>
</module>
<module name="OneTopLevelClass"/>
<module name="NoLineWrap"/>
<module name="EmptyBlock">
@@ -212,13 +223,10 @@
<property name="target" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
</module>
<module name="JavadocMethod">
<property name="scope" value="public"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingThrowsTags" value="true"/>
<property name="validateThrows" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
<property name="minLineCount" value="2"/>
<property name="allowedAnnotations" value="Override, Test"/>
<property name="allowThrowsTagsForSubclasses" value="true"/>
</module>
<module name="MethodName">
<property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9_]*$"/>
@@ -57,6 +57,7 @@
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
@@ -189,14 +190,16 @@ public boolean initLocalEngines() {

private void initTasks() {
reportThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
reportThread.scheduleAtFixedRate(
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
reportThread,
this::generateNodeReport,
ClusterConstant.REPORT_INTERVAL_SEC,
ClusterConstant.REPORT_INTERVAL_SEC,
TimeUnit.SECONDS);
hardLinkCleanerThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
hardLinkCleanerThread.scheduleAtFixedRate(
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
hardLinkCleanerThread,
new HardLinkCleaner(),
ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
@@ -33,6 +33,7 @@
import org.apache.iotdb.cluster.log.manage.serializable.LogManagerMeta;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;

@@ -152,8 +153,10 @@ protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier appli
ClusterDescriptor.getInstance().getConfig().getLogDeleteCheckIntervalSecond();

if (logDeleteCheckIntervalSecond > 0) {

this.deleteLogFuture =
deleteLogExecutorService.scheduleAtFixedRate(
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
deleteLogExecutorService,
this::checkDeleteLog,
logDeleteCheckIntervalSecond,
logDeleteCheckIntervalSecond,
@@ -24,6 +24,7 @@
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.StableEntryManager;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -169,7 +170,8 @@ private void initCommonProperties() {
.build());

this.persistLogDeleteLogFuture =
persistLogDeleteExecutorService.scheduleAtFixedRate(
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
persistLogDeleteExecutorService,
this::checkDeletePersistRaftLog,
LOG_DELETE_CHECK_INTERVAL_SECOND,
LOG_DELETE_CHECK_INTERVAL_SECOND,
@@ -29,6 +29,7 @@
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;

import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -57,7 +58,8 @@ public PullSnapshotHintService(DataGroupMember member) {

public void start() {
this.service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "PullSnapshotHint");
this.service.scheduleAtFixedRate(this::sendHints, 0, 10, TimeUnit.MILLISECONDS);
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
this.service, this::sendHints, 0, 10, TimeUnit.MILLISECONDS);
}

public void stop() {
@@ -35,6 +35,7 @@
import org.apache.iotdb.cluster.utils.nodetool.function.NodeToolCmd;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.StartupException;
@@ -92,19 +93,20 @@ public void start() throws StartupException {
private void startCollectClusterStatus() {
// monitor all nodes' live status
LOGGER.info("start metric node status and leader distribution");
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.CLUSTER_MONITOR.getName())
.scheduleAtFixedRate(
() -> {
MetaGroupMember metaGroupMember = ClusterIoTDB.getInstance().getMetaGroupMember();
if (metaGroupMember != null
&& metaGroupMember.getLeader().equals(metaGroupMember.getThisNode())) {
metricNodeStatus(metaGroupMember);
metricLeaderDistribution(metaGroupMember);
}
},
10L,
10L,
TimeUnit.SECONDS);
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.CLUSTER_MONITOR.getName()),
() -> {
MetaGroupMember metaGroupMember = ClusterIoTDB.getInstance().getMetaGroupMember();
if (metaGroupMember != null
&& metaGroupMember.getLeader().equals(metaGroupMember.getThisNode())) {
metricNodeStatus(metaGroupMember);
metricLeaderDistribution(metaGroupMember);
}
},
10L,
10L,
TimeUnit.SECONDS);
}

private void metricLeaderDistribution(MetaGroupMember metaGroupMember) {
@@ -25,6 +25,7 @@
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
@@ -82,7 +83,8 @@ public PartitionManager(Manager configManager, PartitionInfo partitionInfo) {
this.partitionInfo = partitionInfo;
this.regionCleaner =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("IoTDB-Region-Cleaner");
regionCleaner.scheduleAtFixedRate(
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
regionCleaner,
this::clearDeletedRegions,
REGION_CLEANER_WORK_INITIAL_DELAY,
REGION_CLEANER_WORK_INTERVAL,
@@ -86,6 +86,7 @@ void initSession() {
sessionPoolSize);
}

@SuppressWarnings("unsafeThreadSchedule")
void initScheduler() {
if (batchSize > 0) {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.commons.concurrent.threadpool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorUtil {

private static final Logger logger = LoggerFactory.getLogger(ScheduledExecutorUtil.class);

/**
* A safe wrapper method to make sure the exception thrown by the previous running will not affect
* the next one. Please reference the javadoc of {@link
* ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} for more details.
*
* @param executor the ScheduledExecutorService instance.
* @param command same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
* long, long, TimeUnit)}.
* @param initialDelay same parameter in {@link
* ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}.
* @param period same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
* long, long, TimeUnit)}.
* @param unit same parameter in {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
* long, long, TimeUnit)}.
* @return the same return value of {@link ScheduledExecutorService#scheduleAtFixedRate(Runnable,
* long, long, TimeUnit)}.
*/
@SuppressWarnings("unsafeThreadSchedule")
public static ScheduledFuture<?> safelyScheduleAtFixedRate(
ScheduledExecutorService executor,
Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
return executor.scheduleAtFixedRate(
() -> {
try {
command.run();
} catch (Throwable t) {
logger.error("Schedule task failed", t);
}
},
initialDelay,
period,
unit);
}

/**
* A safe wrapper method to make sure the exception thrown by the previous running will not affect
* the next one. Please reference the javadoc of {@link
* ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} for more
* details.
*
* @param executor the ScheduledExecutorService instance.
* @param command same parameter in {@link
* ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
* @param initialDelay same parameter in {@link
* ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
* @param delay same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
* long, long, TimeUnit)}.
* @param unit same parameter in {@link ScheduledExecutorService#scheduleWithFixedDelay(Runnable,
* long, long, TimeUnit)}.
* @return the same return value of {@link
* ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)}.
*/
@SuppressWarnings("unsafeThreadSchedule")
public static ScheduledFuture<?> safelyScheduleWithFixedDelay(
ScheduledExecutorService executor,
Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
return executor.scheduleWithFixedDelay(
() -> {
try {
command.run();
} catch (Throwable t) {
logger.error("Schedule task failed", t);
}
},
initialDelay,
delay,
unit);
}
}
@@ -58,12 +58,14 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
}

@Override
@SuppressWarnings("unsafeThreadSchedule")
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return service.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
@SuppressWarnings("unsafeThreadSchedule")
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return service.scheduleWithFixedDelay(command, initialDelay, delay, unit);
@@ -56,12 +56,14 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
}

@Override
@SuppressWarnings("unsafeThreadSchedule")
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return service.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
@SuppressWarnings("unsafeThreadSchedule")
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return service.scheduleWithFixedDelay(command, initialDelay, delay, unit);
@@ -120,7 +120,7 @@ public void testNewSingleThreadScheduledExecutor() throws InterruptedException {
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(POOL_NAME, handler);
for (int i = 0; i < threadCount; i++) {
Runnable task = new TestThread(reason);
ScheduledFuture<?> future = exec.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
ScheduledFuture<?> future = exec.schedule(task, 0, TimeUnit.SECONDS);
try {
future.get();
} catch (ExecutionException e) {
@@ -147,7 +147,7 @@ public void testNewScheduledThreadPool() throws InterruptedException {
IoTDBThreadPoolFactory.newScheduledThreadPool(threadCount / 2, POOL_NAME, handler);
for (int i = 0; i < threadCount; i++) {
Runnable task = new TestThread(reason);
ScheduledFuture<?> future = exec.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
ScheduledFuture<?> future = exec.schedule(task, 0, TimeUnit.SECONDS);
try {
future.get();
} catch (ExecutionException e) {
@@ -942,12 +942,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0.0</version>
<version>3.1.2</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.18</version>
<version>8.45.1</version>
</dependency>
</dependencies>
<executions>

0 comments on commit d190615

Please sign in to comment.