Skip to content

Commit

Permalink
Make time-related variables more readable (#6158)
Browse files Browse the repository at this point in the history
* Make time-related variables more readable

* Patch some improvements from the code reviewer

* Remove unnecessary boxing of Long type variables
  • Loading branch information
asdf2014 authored and jon-wei committed Aug 21, 2018
1 parent 266f3df commit 3647d4c
Show file tree
Hide file tree
Showing 24 changed files with 75 additions and 60 deletions.
Expand Up @@ -28,11 +28,12 @@
import javax.annotation.concurrent.GuardedBy;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
{
private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;
private static final long SHUTDOWN_WAIT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);

private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED);
private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -100,7 +101,7 @@ public void start()
public void stop()
{
if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT);
State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT_MILLIS);
if (current != State.STOPPED) {
throw new ISE("Expected state [%s] found [%s]", State.STARTED, current);
}
Expand Down
Expand Up @@ -25,13 +25,13 @@

import java.util.Collections;
import java.util.List;

import java.util.concurrent.TimeUnit;

public class AmbariMetricsEmitterConfig
{
private static final int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD_MILLIS = (long) (60 * 1000); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
private static final String DEFAULT_PROTOCOL = "http";

@JsonProperty
Expand Down Expand Up @@ -106,7 +106,7 @@ public AmbariMetricsEmitterConfig(
);
this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
}

@JsonProperty
Expand Down
Expand Up @@ -56,7 +56,7 @@ public class GraphiteEmitter implements Emitter
private final List<Emitter> requestLogEmitters;
private final AtomicBoolean started = new AtomicBoolean(false);
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
private static final long FLUSH_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); // default flush wait 1 min
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("GraphiteEmitter-%s")
Expand Down Expand Up @@ -222,7 +222,7 @@ public void flush()
if (started.get()) {
Future future = exec.schedule(new ConsumerRunnable(), 0, TimeUnit.MILLISECONDS);
try {
future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
future.get(FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof InterruptedException) {
Expand Down
Expand Up @@ -25,15 +25,15 @@

import java.util.Collections;
import java.util.List;

import java.util.concurrent.TimeUnit;

public class GraphiteEmitterConfig
{
public static final String PLAINTEXT_PROTOCOL = "plaintext";
public static final String PICKLE_PROTOCOL = "pickle";
private static final int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec

@JsonProperty
private final String hostname;
Expand Down Expand Up @@ -142,15 +142,15 @@ public GraphiteEmitterConfig(
@JsonProperty("waitForEventTime") Long waitForEventTime
)
{
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
this.requestLogEmitters = requestLogEmitters == null ? Collections.emptyList() : requestLogEmitters;
this.druidToGraphiteEventConverter = Preconditions.checkNotNull(
druidToGraphiteEventConverter,
"Event converter can not ne null dude"
);
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD_MILLIS : flushPeriod;
this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize;
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = Preconditions.checkNotNull(port, "port can not be null");
Expand Down
Expand Up @@ -43,6 +43,7 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaEightSimpleConsumerFirehoseFactory implements
Expand Down Expand Up @@ -72,7 +73,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements

private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList<>();
private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
private static final int CONSUMER_FETCH_TIMEOUT = 10000;
private static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(10);

@JsonCreator
public KafkaEightSimpleConsumerFirehoseFactory(
Expand Down Expand Up @@ -307,7 +308,7 @@ public void run()
try {
while (!stopped.get()) {
try {
Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT);
Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT_MILLIS);
int count = 0;
for (BytesMessageWithOffset msgWithOffset : msgs) {
offset = msgWithOffset.offset();
Expand Down
Expand Up @@ -49,6 +49,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example}
Expand All @@ -74,10 +75,10 @@ public class KafkaSimpleConsumer
private List<HostAndPort> replicaBrokers;
private SimpleConsumer consumer = null;

private static final int SO_TIMEOUT = 30000;
private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);
private static final int BUFFER_SIZE = 65536;
private static final long RETRY_INTERVAL = 1000L;
private static final int FETCH_SIZE = 100000000;
private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1);
private static final int FETCH_SIZE = 100_000_000;

public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List<String> brokers, boolean earliest)
{
Expand Down Expand Up @@ -121,7 +122,7 @@ private void ensureConsumer(Broker leader) throws InterruptedException
);

consumer = new SimpleConsumer(
leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId
leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, clientId
);
}
}
Expand Down Expand Up @@ -306,7 +307,7 @@ private PartitionMetadata findLeader() throws InterruptedException
SimpleConsumer consumer = null;
try {
log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString());
consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId);
TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic)));

List<TopicMetadata> metaData = resp.topicsMetadata();
Expand Down Expand Up @@ -361,7 +362,7 @@ private Broker findNewLeader(Broker oldLeader) throws InterruptedException
}
}

Thread.sleep(RETRY_INTERVAL);
Thread.sleep(RETRY_INTERVAL_MILLIS);
retryCnt++;
// if could not find the leader for current replicaBrokers, let's try to
// find one via allBrokers
Expand Down
Expand Up @@ -243,7 +243,7 @@ public enum Func
{
UNION,
INTERSECT,
NOT;
NOT
}

public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object... holders)
Expand Down
Expand Up @@ -61,6 +61,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
*
Expand Down Expand Up @@ -149,7 +150,7 @@ public long milliseconds()
@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
return TimeUnit.MILLISECONDS.toNanos(milliseconds());
}

@Override
Expand Down
Expand Up @@ -448,7 +448,7 @@ public void run()
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
Expand Down
Expand Up @@ -65,6 +65,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class KafkaIndexTask extends AbstractTask implements ChatHandler
Expand All @@ -83,7 +84,7 @@ public enum Status
private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random();
static final long POLL_TIMEOUT = 100;
static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;

private final DataSchema dataSchema;
Expand Down
Expand Up @@ -371,7 +371,7 @@ public void run()
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
Expand Down
Expand Up @@ -327,7 +327,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
Expand Down
Expand Up @@ -220,7 +220,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
// which will typically be either the main data processing loop or the persist thread.

// Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a
// lock to be acquired.
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
Expand Down
Expand Up @@ -35,14 +35,15 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

public class Tasks
{
public static final int DEFAULT_REALTIME_TASK_PRIORITY = 75;
public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
public static final int DEFAULT_TASK_PRIORITY = 0;
public static final long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);

public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
Expand Down
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -68,6 +69,8 @@
*/
public class TaskQueue
{
private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);

private final List<Task> tasks = Lists.newArrayList();
private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();

Expand Down Expand Up @@ -290,7 +293,7 @@ public String apply(Task task)
}
// awaitNanos because management may become necessary without this condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
}
finally {
giant.unlock();
Expand Down
Expand Up @@ -41,14 +41,15 @@
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.TimeUnit;

/**
*/
public class ChannelResourceFactory implements ResourceFactory<String, ChannelFuture>
{
private static final Logger log = new Logger(ChannelResourceFactory.class);

private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT = 10000L; /* 10 seconds */
private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);

private final ClientBootstrap bootstrap;
private final SSLContext sslContext;
Expand All @@ -65,7 +66,7 @@ public ChannelResourceFactory(
this.bootstrap = Preconditions.checkNotNull(bootstrap, "bootstrap");
this.sslContext = sslContext;
this.timer = timer;
this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT;
this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS;

if (sslContext != null) {
Preconditions.checkNotNull(timer, "timer is required when sslContext is present");
Expand Down
4 changes: 3 additions & 1 deletion processing/src/main/java/io/druid/query/QueryContexts.java
Expand Up @@ -25,6 +25,8 @@
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Numbers;

import java.util.concurrent.TimeUnit;

@PublicApi
public class QueryContexts
{
Expand All @@ -41,7 +43,7 @@ public class QueryContexts
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final long NO_TIMEOUT = 0;

public static <T> boolean isBySegment(Query<T> query)
Expand Down
Expand Up @@ -38,7 +38,7 @@
public class AsyncQueryRunnerTest
{

private static final long TEST_TIMEOUT = 60000;
private static final long TEST_TIMEOUT_MILLIS = 60_000;

private final ExecutorService executor;
private final Query query;
Expand All @@ -53,7 +53,7 @@ public AsyncQueryRunnerTest()
.build();
}

@Test(timeout = TEST_TIMEOUT)
@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testAsyncNature()
{
final CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -83,7 +83,7 @@ public Sequence run(QueryPlus queryPlus, Map responseContext)
Assert.assertEquals(Collections.singletonList(1), lazy.toList());
}

@Test(timeout = TEST_TIMEOUT)
@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testQueryTimeoutHonored()
{
QueryRunner baseRunner = new QueryRunner()
Expand Down
Expand Up @@ -31,12 +31,13 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;

import java.lang.annotation.Annotation;
import java.util.concurrent.TimeUnit;

/**
*/
public class JettyHttpClientModule implements Module
{
private static final long CLIENT_CONNECT_TIMEOUT = 500;
private static final long CLIENT_CONNECT_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(500);

public static JettyHttpClientModule global()
{
Expand Down Expand Up @@ -120,7 +121,7 @@ public HttpClient get()
httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
httpClient.setMaxRequestsQueuedPerDestination(config.getNumRequestsQueued());
httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT);
httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT_MILLIS);
httpClient.setRequestBufferSize(config.getRequestBuffersize());
final QueuedThreadPool pool = new QueuedThreadPool(config.getNumMaxThreads());
pool.setName(JettyHttpClientModule.class.getSimpleName() + "-threadPool-" + pool.hashCode());
Expand Down

0 comments on commit 3647d4c

Please sign in to comment.