Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) #14055

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.storage.StringConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.storage.SimpleHeaderConverter
org.apache.kafka.connect.storage.StringConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.sink.SinkConnectorTest$TestSinkConnector
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.source.SourceConnectorTest$TestSourceConnector
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.file.FileStreamSinkConnector
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.file.FileStreamSourceConnector
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.json.JsonConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.json.JsonConverter
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.kafka.connect.mirror.MirrorCheckpointConnector
org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
org.apache.kafka.connect.mirror.MirrorSourceConnector
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
Expand All @@ -35,13 +37,18 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.ONLY_SCAN;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_FAIL;
import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.SERVICE_LOAD;

/**
* Common base class providing configuration for Kafka Connect workers, whether standalone or distributed.
Expand Down Expand Up @@ -122,6 +129,18 @@ public class WorkerConfig extends AbstractConfig {
+ "by the worker's scanner before config providers are initialized and used to "
+ "replace variables.";

public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins present in the classpath "
+ "and plugin.path configuration. This can be one of multiple values with the following meanings:\n"
+ "* " + ONLY_SCAN + ": Discover plugins only by reflection. "
+ "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n"
+ "* " + HYBRID_WARN + ": Discover plugins reflectively and by ServiceLoader. "
+ "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n"
+ "* " + HYBRID_FAIL + ": Discover plugins reflectively and by ServiceLoader. "
+ "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n"
+ "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. Faster startup than other modes. "
+ "Plugins which are not discoverable by ServiceLoader may not be usable.";

public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
protected static final String CONFIG_PROVIDERS_DOC =
"Comma-separated names of <code>ConfigProvider</code> classes, loaded and used "
Expand Down Expand Up @@ -199,6 +218,12 @@ protected static ConfigDef baseConfigDef() {
null,
Importance.LOW,
PLUGIN_PATH_DOC)
.define(PLUGIN_DISCOVERY_CONFIG,
Type.STRING,
PluginDiscoveryMode.HYBRID_WARN.toString(),
gharris1727 marked this conversation as resolved.
Show resolved Hide resolved
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(PluginDiscoveryMode.class)),
Importance.LOW,
PLUGIN_DISCOVERY_DOC)
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG,
30000, atLeast(0), Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
Expand Down Expand Up @@ -401,6 +426,16 @@ public static String pluginPath(Map<String, String> props) {
return props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
}

public static PluginDiscoveryMode pluginDiscovery(Map<String, String> props) {
String value = props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN.toString());
try {
return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new ConnectException("Invalid " + PLUGIN_DISCOVERY_CONFIG + " value, must be one of "
+ Arrays.toString(Utils.enumOptions(PluginDiscoveryMode.class)));
}
}

public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
logInternalConverterRemovalWarnings(props);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;

import java.util.Locale;

/**
* Strategy to use to discover plugins usable on a Connect worker.
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery">KIP-898</a>
*/
public enum PluginDiscoveryMode {

/**
* Scan for plugins reflectively. This corresponds to the legacy behavior of Connect prior to KIP-898.
* <p>Note: the following plugins are still loaded using {@link java.util.ServiceLoader} in this mode:
* <ul>
* <li>{@link org.apache.kafka.common.config.provider.ConfigProvider}</li>
* <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li>
* <li>{@link org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li>
* </ul>
*/
ONLY_SCAN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Emit warnings if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_WARN,
/**
* Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
* Fail worker during startup if one or more plugins is not available via {@link java.util.ServiceLoader}
*/
HYBRID_FAIL,
/**
* Discover plugins via {@link java.util.ServiceLoader} only.
* Plugins may not be usable if they are not available via {@link java.util.ServiceLoader}
*/
SERVICE_LOAD;

public boolean reflectivelyScan() {
return this != SERVICE_LOAD;
}

public boolean serviceLoad() {
return this != ONLY_SCAN;
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
Loading