Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make time-related variables more readable #6158

Merged
merged 3 commits into from Aug 21, 2018
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -28,11 +28,12 @@
import javax.annotation.concurrent.GuardedBy;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class Log4jShutdown implements ShutdownCallbackRegistry, LifeCycle
{
private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;
private static final long SHUTDOWN_WAIT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);

private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED);
private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>();
@@ -100,7 +101,7 @@ public void start()
public void stop()
{
if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT);
State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT_MILLIS);
if (current != State.STOPPED) {
throw new ISE("Expected state [%s] found [%s]", State.STARTED, current);
}
@@ -25,13 +25,13 @@

import java.util.Collections;
import java.util.List;

import java.util.concurrent.TimeUnit;

public class AmbariMetricsEmitterConfig
{
private static final int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD_MILLIS = (long) (60 * 1000); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec
private static final String DEFAULT_PROTOCOL = "http";

@JsonProperty
@@ -106,7 +106,7 @@ public AmbariMetricsEmitterConfig(
);
this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
}

@JsonProperty
@@ -56,7 +56,7 @@
private final List<Emitter> requestLogEmitters;
private final AtomicBoolean started = new AtomicBoolean(false);
private final LinkedBlockingQueue<GraphiteEvent> eventsQueue;
private static final long FLUSH_TIMEOUT = 60000; // default flush wait 1 min
private static final long FLUSH_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); // default flush wait 1 min
private final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("GraphiteEmitter-%s")
@@ -222,7 +222,7 @@ public void flush()
if (started.get()) {
Future future = exec.schedule(new ConsumerRunnable(), 0, TimeUnit.MILLISECONDS);
try {
future.get(FLUSH_TIMEOUT, TimeUnit.MILLISECONDS);
future.get(FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof InterruptedException) {
@@ -25,15 +25,15 @@

import java.util.Collections;
import java.util.List;

import java.util.concurrent.TimeUnit;

public class GraphiteEmitterConfig
{
public static final String PLAINTEXT_PROTOCOL = "plaintext";
public static final String PICKLE_PROTOCOL = "pickle";
private static final int DEFAULT_BATCH_SIZE = 100;
private static final Long DEFAULT_FLUSH_PERIOD = (long) (60 * 1000); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT = 1000; // default wait for get operations on the queue 1 sec
private static final long DEFAULT_FLUSH_PERIOD_MILLIS = TimeUnit.MINUTES.toMillis(1); // flush every one minute
private static final long DEFAULT_GET_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1); // default wait for get operations on the queue 1 sec

@JsonProperty
private final String hostname;
@@ -142,15 +142,15 @@ public GraphiteEmitterConfig(
@JsonProperty("waitForEventTime") Long waitForEventTime
)
{
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT : waitForEventTime;
this.waitForEventTime = waitForEventTime == null ? DEFAULT_GET_TIMEOUT_MILLIS : waitForEventTime;
this.emitWaitTime = emitWaitTime == null ? 0 : emitWaitTime;
this.alertEmitters = alertEmitters == null ? Collections.emptyList() : alertEmitters;
this.requestLogEmitters = requestLogEmitters == null ? Collections.emptyList() : requestLogEmitters;
this.druidToGraphiteEventConverter = Preconditions.checkNotNull(
druidToGraphiteEventConverter,
"Event converter can not ne null dude"
);
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD : flushPeriod;
this.flushPeriod = flushPeriod == null ? DEFAULT_FLUSH_PERIOD_MILLIS : flushPeriod;
this.maxQueueSize = maxQueueSize == null ? Integer.MAX_VALUE : maxQueueSize;
this.hostname = Preconditions.checkNotNull(hostname, "hostname can not be null");
this.port = Preconditions.checkNotNull(port, "port can not be null");
@@ -43,6 +43,7 @@
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaEightSimpleConsumerFirehoseFactory implements
@@ -72,7 +73,7 @@

private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList<>();
private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
private static final int CONSUMER_FETCH_TIMEOUT = 10000;
private static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(10);

This comment has been minimized.

Copy link
@jihoonson

jihoonson Aug 14, 2018

Contributor

long makes more sense to me. Would you please change the type as well?

This comment has been minimized.

Copy link
@asdf2014

asdf2014 Aug 16, 2018

Author Member

Yes, I have tried before. But, io.druid.firehose.kafka.KafkaSimpleConsumer#fetch method needs an int type timeoutMs param.


@JsonCreator
public KafkaEightSimpleConsumerFirehoseFactory(
@@ -307,7 +308,7 @@ public void run()
try {
while (!stopped.get()) {
try {
Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT);
Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT_MILLIS);
int count = 0;
for (BytesMessageWithOffset msgWithOffset : msgs) {
offset = msgWithOffset.offset();
@@ -49,6 +49,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example}
@@ -74,10 +75,10 @@
private List<HostAndPort> replicaBrokers;
private SimpleConsumer consumer = null;

private static final int SO_TIMEOUT = 30000;
private static final int SO_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);

This comment has been minimized.

Copy link
@jihoonson

jihoonson Aug 14, 2018

Contributor

Same here. Can be long.

This comment has been minimized.

Copy link
@asdf2014

asdf2014 Aug 16, 2018

Author Member

Yes, I have tried it before. But, the constructor of SimpleConsumer needs an int type soTimeout parameter.

private static final int BUFFER_SIZE = 65536;
private static final long RETRY_INTERVAL = 1000L;
private static final int FETCH_SIZE = 100000000;
private static final long RETRY_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(1);
private static final int FETCH_SIZE = 100_000_000;

public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List<String> brokers, boolean earliest)
{
@@ -121,7 +122,7 @@ private void ensureConsumer(Broker leader) throws InterruptedException
);

consumer = new SimpleConsumer(
leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId
leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, clientId
);
}
}
@@ -306,7 +307,7 @@ private PartitionMetadata findLeader() throws InterruptedException
SimpleConsumer consumer = null;
try {
log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString());
consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT_MILLIS, BUFFER_SIZE, leaderLookupClientId);
TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic)));

List<TopicMetadata> metaData = resp.topicsMetadata();
@@ -361,7 +362,7 @@ private Broker findNewLeader(Broker oldLeader) throws InterruptedException
}
}

Thread.sleep(RETRY_INTERVAL);
Thread.sleep(RETRY_INTERVAL_MILLIS);
retryCnt++;
// if could not find the leader for current replicaBrokers, let's try to
// find one via allBrokers
@@ -243,7 +243,7 @@ private static Sketch deserializeFromMemory(Memory mem)
{
UNION,
INTERSECT,
NOT;
NOT
}

public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object... holders)
@@ -61,6 +61,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
*
@@ -149,7 +150,7 @@ public long milliseconds()
@Override
public long nanoseconds()
{
return milliseconds() * 1_000_000;
return TimeUnit.MILLISECONDS.toNanos(milliseconds());
}

@Override
@@ -448,7 +448,7 @@ public void run()
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
@@ -65,6 +65,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class KafkaIndexTask extends AbstractTask implements ChatHandler
@@ -83,7 +84,7 @@
private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random();
static final long POLL_TIMEOUT = 100;
static final long POLL_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(100);
static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;

private final DataSchema dataSchema;
@@ -371,7 +371,7 @@ public void run()
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT);
records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
@@ -327,7 +327,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
@@ -220,7 +220,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
// which will typically be either the main data processing loop or the persist thread.

// Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a
// lock to be acquired.
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
@@ -35,14 +35,15 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

public class Tasks
{
public static final int DEFAULT_REALTIME_TASK_PRIORITY = 75;
public static final int DEFAULT_BATCH_INDEX_TASK_PRIORITY = 50;
public static final int DEFAULT_MERGE_TASK_PRIORITY = 25;
public static final int DEFAULT_TASK_PRIORITY = 0;
public static final long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
public static final long DEFAULT_LOCK_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);

public static final String PRIORITY_KEY = "priority";
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
@@ -54,6 +54,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@@ -68,6 +69,8 @@
*/
public class TaskQueue
{
private final long MANAGEMENT_WAIT_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(60);

private final List<Task> tasks = Lists.newArrayList();
private final Map<String, ListenableFuture<TaskStatus>> taskFutures = Maps.newHashMap();

@@ -290,7 +293,7 @@ public String apply(Task task)
}
// awaitNanos because management may become necessary without this condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
managementMayBeNecessary.awaitNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
}
finally {
giant.unlock();
@@ -41,14 +41,15 @@
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.TimeUnit;

/**
*/
public class ChannelResourceFactory implements ResourceFactory<String, ChannelFuture>
{
private static final Logger log = new Logger(ChannelResourceFactory.class);

private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT = 10000L; /* 10 seconds */
private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);

private final ClientBootstrap bootstrap;
private final SSLContext sslContext;
@@ -65,7 +66,7 @@ public ChannelResourceFactory(
this.bootstrap = Preconditions.checkNotNull(bootstrap, "bootstrap");
this.sslContext = sslContext;
this.timer = timer;
this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT;
this.sslHandshakeTimeout = sslHandshakeTimeout >= 0 ? sslHandshakeTimeout : DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS;

if (sslContext != null) {
Preconditions.checkNotNull(timer, "timer is required when sslContext is present");
@@ -25,6 +25,8 @@
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Numbers;

import java.util.concurrent.TimeUnit;

@PublicApi
public class QueryContexts
{
@@ -41,7 +43,7 @@
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = 300_000; // 5 minutes
public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final long NO_TIMEOUT = 0;

public static <T> boolean isBySegment(Query<T> query)
@@ -38,7 +38,7 @@
public class AsyncQueryRunnerTest
{

private static final long TEST_TIMEOUT = 60000;
private static final long TEST_TIMEOUT_MILLIS = 60_000;

This comment has been minimized.

Copy link
@jihoonson

jihoonson Aug 14, 2018

Contributor

TimeUnit.SECONDS.toMillis(60);?

This comment has been minimized.

Copy link
@asdf2014

asdf2014 Aug 16, 2018

Author Member

Yep, I have tried this before too. However, the @Test annotation requires a constant attribute value.


private final ExecutorService executor;
private final Query query;
@@ -53,7 +53,7 @@ public AsyncQueryRunnerTest()
.build();
}

@Test(timeout = TEST_TIMEOUT)
@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testAsyncNature()
{
final CountDownLatch latch = new CountDownLatch(1);
@@ -83,7 +83,7 @@ public Sequence run(QueryPlus queryPlus, Map responseContext)
Assert.assertEquals(Collections.singletonList(1), lazy.toList());
}

@Test(timeout = TEST_TIMEOUT)
@Test(timeout = TEST_TIMEOUT_MILLIS)
public void testQueryTimeoutHonored()
{
QueryRunner baseRunner = new QueryRunner()
@@ -31,12 +31,13 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;

import java.lang.annotation.Annotation;
import java.util.concurrent.TimeUnit;

/**
*/
public class JettyHttpClientModule implements Module
{
private static final long CLIENT_CONNECT_TIMEOUT = 500;
private static final long CLIENT_CONNECT_TIMEOUT_MILLIS = TimeUnit.MILLISECONDS.toMillis(500);

public static JettyHttpClientModule global()
{
@@ -120,7 +121,7 @@ public HttpClient get()
httpClient.setIdleTimeout(config.getReadTimeout().getMillis());
httpClient.setMaxConnectionsPerDestination(config.getNumConnections());
httpClient.setMaxRequestsQueuedPerDestination(config.getNumRequestsQueued());
httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT);
httpClient.setConnectTimeout(CLIENT_CONNECT_TIMEOUT_MILLIS);
httpClient.setRequestBufferSize(config.getRequestBuffersize());
final QueuedThreadPool pool = new QueuedThreadPool(config.getNumMaxThreads());
pool.setName(JettyHttpClientModule.class.getSimpleName() + "-threadPool-" + pool.hashCode());
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.