Skip to content

Commit

Permalink
Perform style refactoring based on code inspection (linkedin#1657)
Browse files Browse the repository at this point in the history
  • Loading branch information
efeg committed Aug 19, 2021
1 parent 92423ce commit 8f2488e
Show file tree
Hide file tree
Showing 101 changed files with 350 additions and 498 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Expand Up @@ -436,8 +436,8 @@ task buildApiWiki(type: Exec) {
}

static def getScalaBinaryVersion(versionStr) {
String[] versionList = versionStr.split("\\.");
return versionList[0] + "." + versionList[1];
String[] versionList = versionStr.split("\\.")
return versionList[0] + "." + versionList[1]
}

//wrapper generation task
Expand Down
6 changes: 3 additions & 3 deletions buildSrc/build.gradle
Expand Up @@ -6,16 +6,16 @@ plugins {

repositories {
mavenLocal()
jcenter()
mavenCentral()
}

dependencies {
compile gradleApi()
compile localGroovy()

compile 'org.ajoberstar:gradle-git:1.7.2'
compile 'org.ajoberstar:grgit:1.7.2'
compile 'org.apache.httpcomponents:fluent-hc:4.5.13'
compile('org.jfrog.buildinfo:build-info-extractor-gradle:4.21.0') {
compile('org.jfrog.buildinfo:build-info-extractor-gradle:4.24.14') {
exclude module: 'groovy-all'
}
}
Expand Up @@ -10,14 +10,14 @@ import org.jfrog.gradle.plugin.artifactory.dsl.ArtifactoryPluginConvention
class DistributeTask extends DefaultTask {

@TaskAction
public void distributeBuild() {
void distributeBuild() {
ArtifactoryPluginConvention convention = project.convention.plugins.artifactory
def buildNumber = convention.clientConfig.info.buildNumber
def buildName = convention.clientConfig.info.buildName
def context = convention.clientConfig.publisher.contextUrl
def password = convention.clientConfig.publisher.password

if (password == null || password.equals("")) {
if (password == null || password == "") {
throw new IllegalArgumentException("password not set")
}

Expand All @@ -41,8 +41,8 @@ class DistributeTask extends DefaultTask {
.returnResponse()

def bout = new ByteArrayOutputStream()
response.getEntity().writeTo(bout);
String errMsg = new String(bout.toByteArray());
response.getEntity().writeTo(bout)
String errMsg = new String(bout.toByteArray())
logger.lifecycle("Distribute Response: {} {}", response, errMsg)

if (!Integer.toString(response.getStatusLine().getStatusCode()).startsWith("2")) {
Expand Down
8 changes: 4 additions & 4 deletions build_api_wiki.sh
Expand Up @@ -6,8 +6,8 @@ BASE_DIR=$(pwd)
SCHEMA_WORKDIR="$BASE_DIR/cruise-control/src/yaml"
WIKI_TARGET="$BASE_DIR/target/api_wiki"

rm -rf $WIKI_TARGET
mkdir -p $WIKI_TARGET
rm -rf "$WIKI_TARGET"
mkdir -p "$WIKI_TARGET"

cd $SCHEMA_WORKDIR; npx redoc-cli bundle base.yaml
mv -v $SCHEMA_WORKDIR/redoc-static.html $WIKI_TARGET
cd "$SCHEMA_WORKDIR"; npx redoc-cli bundle base.yaml
mv -v "$SCHEMA_WORKDIR"/redoc-static.html "$WIKI_TARGET"
Expand Up @@ -56,7 +56,7 @@ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog)
_values.put(update.getKey(), update.getValue());
}
definition.parse(_values);
_used = Collections.synchronizedSet(new HashSet<String>());
_used = Collections.synchronizedSet(new HashSet<>());
this._definition = definition;
if (doLog) {
logAll();
Expand Down Expand Up @@ -291,7 +291,7 @@ public <T> List<T> getConfiguredInstances(String key, Class<T> t) throws CruiseC
public <T> List<T> getConfiguredInstances(String key, Class<T> t, Map<String, Object> configOverrides)
throws CruiseControlException {
List<String> klasses = getList(key);
List<T> objects = new ArrayList<T>();
List<T> objects = new ArrayList<>();
if (klasses == null) {
return objects;
}
Expand Down
Expand Up @@ -18,7 +18,7 @@ public class ConfigValue {
private boolean _visible;

public ConfigValue(String name) {
this(name, null, new ArrayList<>(), new ArrayList<String>());
this(name, null, new ArrayList<>(), new ArrayList<>());
}

public ConfigValue(String name, Object value, List<Object> recommendedValues, List<String> errorMessages) {
Expand Down
Expand Up @@ -4,7 +4,6 @@

package com.linkedin.cruisecontrol.detector.metricanomaly;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -24,7 +23,7 @@
public enum MetricAnomalyType {
SUSPECT, RECENT, PERSISTENT;

private static final List<MetricAnomalyType> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));
private static final List<MetricAnomalyType> CACHED_VALUES = List.of(values());

/**
* Use this instead of values() because values() creates a new array each time.
Expand Down
Expand Up @@ -516,7 +516,7 @@ private int resetRawValueIndices(long prevOldestWindowIndex, int numIndicesToRes
numAbandonedSamples += rawValues.resetWindowIndices(prevOldestWindowIndex, numIndicesToReset);
}

for (; iterator.hasNext(); ) {
while (iterator.hasNext()) {
RawMetricValues rawValues = iterator.next();
rawValues.updateOldestWindowIndex(currentOldestWindowIndex);
numAbandonedSamples += rawValues.resetWindowIndices(prevOldestWindowIndex, numIndicesToReset);
Expand Down
Expand Up @@ -41,7 +41,7 @@ class MetricSampleAggregatorState<G, E extends Entity<G>> extends WindowIndexedA
*/
MetricSampleAggregatorState(int numWindows, long windowMs, int completenessCacheSize) {
super();
_completenessCache = new LinkedHashMap<AggregationOptions<G, E>, MetricSampleCompleteness<G, E>>() {
_completenessCache = new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(Map.Entry<AggregationOptions<G, E>, MetricSampleCompleteness<G, E>> eldest) {
return this.size() > completenessCacheSize;
Expand Down
Expand Up @@ -207,7 +207,7 @@ public String toString() {
for (int i = 0; i < _values.length; i++) {
joiner.add(i + ":" + _values[i]);
}
return String.format("{avg:%f, max:%f, %s}", avg(), max(), joiner.toString());
return String.format("{avg:%f, max:%f, %s}", avg(), max(), joiner);
}

private float updateMax() {
Expand Down
Expand Up @@ -5,8 +5,6 @@

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;

Expand All @@ -18,12 +16,9 @@ public class CruiseControlUtilsTest {
private static final long MAX_ESTIMATED_PAUSE = 30L;
private static final Map<Long, String> RESPONSE_BY_TIME_MS;
static {
Map<Long, String> responseByTimeMs = new HashMap<>(4);
responseByTimeMs.put(0L, "1970-01-01T00:00:00Z");
responseByTimeMs.put(-10L, "1969-12-31T23:59:59Z");
responseByTimeMs.put(Long.MAX_VALUE, "+292278994-08-17T07:12:55Z");
responseByTimeMs.put(1614978098383L, "2021-03-05T21:01:38Z");
RESPONSE_BY_TIME_MS = Collections.unmodifiableMap(responseByTimeMs);
RESPONSE_BY_TIME_MS =
Map.of(0L, "1970-01-01T00:00:00Z", -10L, "1969-12-31T23:59:59Z", Long.MAX_VALUE, "+292278994-08-17T07:12:55Z", 1614978098383L,
"2021-03-05T21:01:38Z");
}

@Test
Expand Down
Expand Up @@ -178,7 +178,7 @@ public static void retry(Supplier<Boolean> function, long scaleMs, int base, int
}
timeToSleep *= base;
Thread.sleep(timeToSleep);
} catch (InterruptedException e) {
} catch (InterruptedException ignored) {

}
}
Expand Down
Expand Up @@ -6,10 +6,8 @@

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metrics.KafkaMetric;
Expand Down Expand Up @@ -62,32 +60,18 @@ public final class MetricsUtils {

// Name Set.
private static final Set<String> INTERESTED_NETWORK_METRIC_NAMES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REQUESTS_PER_SEC,
REQUEST_QUEUE_SIZE,
RESPONSE_QUEUE_SIZE,
REQUEST_QUEUE_TIME_MS,
LOCAL_TIME_MS,
TOTAL_TIME_MS)));
Set.of(REQUESTS_PER_SEC, REQUEST_QUEUE_SIZE, RESPONSE_QUEUE_SIZE, REQUEST_QUEUE_TIME_MS, LOCAL_TIME_MS, TOTAL_TIME_MS);

private static final Set<String> INTERESTED_TOPIC_METRIC_NAMES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(BYTES_IN_PER_SEC,
BYTES_OUT_PER_SEC,
REPLICATION_BYTES_IN_PER_SEC,
REPLICATION_BYTES_OUT_PER_SEC,
TOTAL_FETCH_REQUEST_PER_SEC,
TOTAL_PRODUCE_REQUEST_PER_SEC,
MESSAGES_IN_PER_SEC)));
private static final Set<String> INTERESTED_LOG_METRIC_NAMES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(SIZE, LOG_FLUSH_RATE_AND_TIME_MS)));
Set.of(BYTES_IN_PER_SEC, BYTES_OUT_PER_SEC, REPLICATION_BYTES_IN_PER_SEC, REPLICATION_BYTES_OUT_PER_SEC, TOTAL_FETCH_REQUEST_PER_SEC,
TOTAL_PRODUCE_REQUEST_PER_SEC, MESSAGES_IN_PER_SEC);
private static final Set<String> INTERESTED_LOG_METRIC_NAMES = Set.of(SIZE, LOG_FLUSH_RATE_AND_TIME_MS);

private static final Set<String> INTERESTED_SERVER_METRIC_NAMES =
Collections.unmodifiableSet(Collections.singleton(REQUEST_HANDLER_AVG_IDLE_PERCENT));
private static final Set<String> INTERESTED_SERVER_METRIC_NAMES = Collections.singleton(REQUEST_HANDLER_AVG_IDLE_PERCENT);

// Request type set
private static final Set<String> INTERESTED_REQUEST_TYPE =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(CONSUMER_FETCH_REQUEST_TYPE,
FOLLOWER_FETCH_REQUEST_TYPE,
PRODUCE_REQUEST_TYPE)));
Set.of(CONSUMER_FETCH_REQUEST_TYPE, FOLLOWER_FETCH_REQUEST_TYPE, PRODUCE_REQUEST_TYPE);

private MetricsUtils() {

Expand Down
Expand Up @@ -97,7 +97,7 @@ public Properties overridingProps() {
}

@Test
public void testReportingMetrics() throws ExecutionException, InterruptedException {
public void testReportingMetrics() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand Down
Expand Up @@ -17,16 +17,17 @@
public class ContainerMetricUtilsTest {

private static final double DELTA = 0.01;
private static final double CPU_PERIOD = 100000.0;

private void mockGetContainerProcessCpuLoad(int processors, double cpuQuota, double cpuPeriod, double cpuUtil, double expectedLoad)
private void mockGetContainerProcessCpuLoad(int processors, double cpuQuota, double cpuUtil, double expectedLoad)
throws Exception {
PowerMock.mockStaticPartial(ContainerMetricUtils.class,
"getAvailableProcessors",
"getCpuPeriod",
"getCpuQuota");

PowerMock.expectPrivate(ContainerMetricUtils.class, "getAvailableProcessors").andReturn(processors);
PowerMock.expectPrivate(ContainerMetricUtils.class, "getCpuPeriod").andReturn(cpuPeriod);
PowerMock.expectPrivate(ContainerMetricUtils.class, "getCpuPeriod").andReturn(CPU_PERIOD);
PowerMock.expectPrivate(ContainerMetricUtils.class, "getCpuQuota").andReturn(cpuQuota);
PowerMock.expectPrivate(ContainerMetricUtils.class, "getCpuQuota").andReturn(cpuQuota);

Expand All @@ -40,16 +41,16 @@ public void testGetContainerProcessCpuLoad() throws Exception {
/*
* expectedContainerProcessCpuLoad = (cpuUtil * processors) / (cpuQuota / cpuPeriod)
*/
mockGetContainerProcessCpuLoad(1, 100000.0, 100000.0, 1.0, 1.0);
mockGetContainerProcessCpuLoad(1, 100000.0, 100000.0, 0.5, 0.5);
mockGetContainerProcessCpuLoad(1, 50000.0, 100000.0, 0.5, 1.0);
mockGetContainerProcessCpuLoad(1, 75000.0, 100000.0, 0.5, 0.66);
mockGetContainerProcessCpuLoad(1, 100000.0, 1.0, 1.0);
mockGetContainerProcessCpuLoad(1, 100000.0, 0.5, 0.5);
mockGetContainerProcessCpuLoad(1, 50000.0, 0.5, 1.0);
mockGetContainerProcessCpuLoad(1, 75000.0, 0.5, 0.66);

mockGetContainerProcessCpuLoad(2, 100000.0, 100000.0, 0.5, 1.0);
mockGetContainerProcessCpuLoad(2, 200000.0, 100000.0, 1.0, 1.0);
mockGetContainerProcessCpuLoad(2, 25000.0, 100000.0, 0.125, 1.0);
mockGetContainerProcessCpuLoad(2, 2500.0, 100000.0, 0.0125, 1.0);
mockGetContainerProcessCpuLoad(2, 100000.0, 0.5, 1.0);
mockGetContainerProcessCpuLoad(2, 200000.0, 1.0, 1.0);
mockGetContainerProcessCpuLoad(2, 25000.0, 0.125, 1.0);
mockGetContainerProcessCpuLoad(2, 2500.0, 0.0125, 1.0);

mockGetContainerProcessCpuLoad(2, ContainerMetricUtils.NO_CPU_QUOTA, 100000.0, 0.125, 0.125);
mockGetContainerProcessCpuLoad(2, ContainerMetricUtils.NO_CPU_QUOTA, 0.125, 0.125);
}
}
Expand Up @@ -18,7 +18,7 @@ public class CCEmbeddedZookeeper implements AutoCloseable {
private final int _port;
private final ZooKeeperServer _zk;
private final ServerCnxnFactory _cnxnFactory;
private CountDownLatch _shutdownLatch = null;
private final CountDownLatch _shutdownLatch = null;

public CCEmbeddedZookeeper() {
int tickTime = 500;
Expand Down
Expand Up @@ -210,18 +210,15 @@ public void startUp() {
* Shutdown Cruise Control.
*/
public void shutdown() {
Thread t = new Thread() {
@Override
public void run() {
LOG.info("Shutting down Kafka Cruise Control...");
_loadMonitor.shutdown();
_executor.shutdown();
_anomalyDetectorManager.shutdown();
_goalOptimizer.shutdown();
closeAdminClientWithTimeout(_adminClient);
LOG.info("Kafka Cruise Control shutdown completed.");
}
};
Thread t = new Thread(() -> {
LOG.info("Shutting down Kafka Cruise Control...");
_loadMonitor.shutdown();
_executor.shutdown();
_anomalyDetectorManager.shutdown();
_goalOptimizer.shutdown();
closeAdminClientWithTimeout(_adminClient);
LOG.info("Kafka Cruise Control shutdown completed.");
});
t.setDaemon(true);
t.start();
try {
Expand Down
Expand Up @@ -34,6 +34,8 @@
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
Expand All @@ -52,6 +54,7 @@
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -67,6 +70,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.linkedin.kafka.cruisecontrol.config.constants.MonitorConfig.RECONNECT_BACKOFF_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.SKIP_HARD_GOAL_CHECK_PARAM;
import static kafka.log.LogConfig.CleanupPolicyProp;
import static kafka.log.LogConfig.RetentionMsProp;
Expand Down Expand Up @@ -758,4 +762,40 @@ public static KafkaCruiseControlConfig readConfig(String propertiesFile) throws
}
return new KafkaCruiseControlConfig(props);
}

/**
* Create a Kafka consumer with the given properties.
*
* @param configs The configurations for Cruise Control.
* @param clientIdPrefix Client id prefix.
* @param bootstrapServers Bootstrap servers.
* @param keyDeserializer Key deserializer of the consumer.
* @param valueDeserializer Value deserializer of the consumer.
* @param isLatestOffsetReset {@code true} to set the value of {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} to "latest", {@code false}
* to set it to "earliest".
* @param <K> The type of the consumer key.
* @param <KT> The type of the key deserializer.
* @param <V> The type of the consumer value.
* @param <VT> The type of the value deserializer.
* @return A new Kafka consumer.
*/
public static <K, KT extends Deserializer<K>, V, VT extends Deserializer<V>> Consumer<K, V> createConsumer(Map<String, ?> configs,
String clientIdPrefix,
String bootstrapServers,
Class<KT> keyDeserializer,
Class<VT> valueDeserializer,
boolean isLatestOffsetReset) {
long randomToken = RANDOM.nextLong();
Properties consumerProps = new Properties();
consumerProps.putAll(configs);
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientIdPrefix + "-consumer-" + randomToken);
consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatestOffsetReset ? "latest" : "earliest");
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.toString(Integer.MAX_VALUE));
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, configs.get(RECONNECT_BACKOFF_MS_CONFIG).toString());
return new KafkaConsumer<>(consumerProps);
}
}

0 comments on commit 8f2488e

Please sign in to comment.