Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Remove magic number and extract Pattern instance from method as class field #4799

Merged
merged 3 commits into from Apr 8, 2018
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+24 −4
Diff settings

Always

Just for now

@@ -180,6 +180,7 @@ public String doc() {
return fields;
}

@Override
public Field field(String fieldName) {
if (type != Type.STRUCT)
throw new DataException("Cannot look up fields on non-struct type");
@@ -336,12 +336,14 @@ public SchemaBuilder field(String fieldName, Schema fieldSchema) {
* Get the list of fields for this Schema. Throws a DataException if this schema is not a struct.
* @return the list of fields for this Schema
*/
@Override
public List<Field> fields() {
if (type != Type.STRUCT)
throw new DataException("Cannot list fields on non-struct type");
return new ArrayList<>(fields.values());
}

@Override
public Field field(String fieldName) {
if (type != Type.STRUCT)
throw new DataException("Cannot look up fields on non-struct type");
@@ -505,6 +505,7 @@ private FilterByKeyIterator(Iterator<Header> original, String key) {
this.key = key;
}

@Override
protected Header makeNext() {
while (original.hasNext()) {
Header header = original.next();
@@ -169,5 +169,6 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
* commit has completed. Implementations of this method should only need to perform final cleanup operations, such
* as closing network connections to the sink system.
*/
@Override
public abstract void stop();
}
@@ -83,6 +83,7 @@ public void commit() throws InterruptedException {
* could set a flag that will force {@link #poll()} to exit immediately and invoke
* {@link java.nio.channels.Selector#wakeup() wakeup()} to interrupt any ongoing requests.
*/
@Override
public abstract void stop();

/**
@@ -97,6 +97,7 @@
super(configDef, props);
}

@Override
public Object get(String key) {
return super.get(key);
}
@@ -135,6 +135,7 @@ public void initialize(TaskConfig taskConfig) {
}
}

@Override
protected void close() {
producer.close(30, TimeUnit.SECONDS);
transformationChain.close();
@@ -109,6 +109,8 @@
public class DistributedHerder extends AbstractHerder implements Runnable {
private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);

private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
private static final int START_STOP_THREAD_POOL_SIZE = 8;

@@ -404,9 +406,9 @@ public void stop() {
forwardRequestExecutor.shutdown();
startAndStopExecutor.shutdown();

if (!forwardRequestExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS))
if (!forwardRequestExecutor.awaitTermination(FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))
forwardRequestExecutor.shutdownNow();
if (!startAndStopExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS))
if (!startAndStopExecutor.awaitTermination(START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS))
startAndStopExecutor.shutdownNow();
} catch (InterruptedException e) {
// ignore
@@ -93,6 +93,7 @@ public WorkerCoordinator(LogContext logContext,
this.rejoinRequested = false;
}

@Override
public void requestRejoin() {
rejoinRequested = true;
}
@@ -315,12 +316,14 @@ public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";

Measurable numConnectors = new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return assignmentSnapshot.connectors().size();
}
};

Measurable numTasks = new Measurable() {
@Override
public double measure(MetricConfig config, long now) {
return assignmentSnapshot.tasks().size();
}
@@ -238,6 +238,7 @@ private void loadJdbcDrivers(final ClassLoader loader) {
// implementing the java.sql.Driver interface.
AccessController.doPrivileged(
new PrivilegedAction<Void>() {
@Override
public Void run() {
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
Driver.class,
@@ -62,6 +62,7 @@
public class RestServer {
private static final Logger log = LoggerFactory.getLogger(RestServer.class);

private static final Pattern LISTENER_PATTERN = Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000;

private static final String PROTOCOL_HTTP = "http";
@@ -118,8 +119,7 @@ public void createConnectors(List<String> listeners) {
* Creates Jetty connector according to configuration
*/
public Connector createConnector(String listener) {
Pattern listenerPattern = Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
Matcher listenerMatcher = listenerPattern.matcher(listener);
Matcher listenerMatcher = LISTENER_PATTERN.matcher(listener);

if (!listenerMatcher.matches())
throw new ConfigException("Listener doesn't have the right format (protocol://hostname:port).");
@@ -67,12 +67,14 @@ public StandaloneHerder(Worker worker, String kafkaClusterId) {
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}

@Override
public synchronized void start() {
log.info("Herder starting");
startServices();
log.info("Herder started");
}

@Override
public synchronized void stop() {
log.info("Herder stopping");

@@ -88,6 +88,7 @@ private void load() {
}
}

@Override
protected void save() {
try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) {
Map<byte[], byte[]> raw = new HashMap<>();
@@ -81,6 +81,7 @@
.field("seqno", Schema.INT64_SCHEMA)
.build();

@Override
public String version() {
return new SchemaSourceConnector().version();
}
@@ -52,6 +52,7 @@ private EmptyUrlType(final List<String> endings) {
this.endings = endings;
}

@Override
public boolean matches(URL url) {
final String protocol = url.getProtocol();
final String externalForm = url.toExternalForm();
@@ -66,6 +67,7 @@ public boolean matches(URL url) {
return false;
}

@Override
public Dir createDir(final URL url) throws Exception {
return emptyVfsDir(url);
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.