From 4b4a425a8f4042f04ca0f83142aff31cdea882d7 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 21 Apr 2016 16:42:34 -0700 Subject: [PATCH 1/3] Traverse CLASSPATH during herder start --- .../kafka/connect/runtime/AbstractHerder.java | 47 ++++++++++++++----- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index a22f15c135e5..dbddd0778698 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -84,6 +84,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con private Map tempConnectors = new ConcurrentHashMap<>(); private static final List> SKIPPED_CONNECTORS = Arrays.>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class); private static List validConnectorPlugins; + private static final Object lock = new Object(); + private Thread classPathTraverser; + public AbstractHerder(Worker worker, String workerId, @@ -101,12 +104,18 @@ protected void startServices() { this.worker.start(); this.statusBackingStore.start(); this.configBackingStore.start(); + traverseClassPath(); } protected void stopServices() { this.statusBackingStore.stop(); this.configBackingStore.stop(); this.worker.stop(); + try { + this.classPathTraverser.join(); + } catch (InterruptedException e) { + // ignore as it can only happen during shutdown + } } @Override @@ -248,22 +257,24 @@ public ConfigInfos validateConfigs(String connType, Map connecto } public static List connectorPlugins() { - if (validConnectorPlugins != null) { - return validConnectorPlugins; - } + synchronized (lock) { + if (validConnectorPlugins != null) { + return validConnectorPlugins; + } - Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); - Set> connectorClasses = reflections.getSubTypesOf(Connector.class); - connectorClasses.removeAll(SKIPPED_CONNECTORS); - List connectorPlugins = new LinkedList<>(); - for (Class connectorClass: connectorClasses) { - int mod = connectorClass.getModifiers(); - if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) { - connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName())); + Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); + Set> connectorClasses = reflections.getSubTypesOf(Connector.class); + connectorClasses.removeAll(SKIPPED_CONNECTORS); + List connectorPlugins = new LinkedList<>(); + for (Class connectorClass : connectorClasses) { + int mod = connectorClass.getModifiers(); + if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) { + connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName())); + } } + validConnectorPlugins = connectorPlugins; + return connectorPlugins; } - validConnectorPlugins = connectorPlugins; - return connectorPlugins; } // public for testing @@ -354,4 +365,14 @@ private String trace(Throwable t) { return null; } } + + private void traverseClassPath() { + classPathTraverser = new Thread(new Runnable() { + @Override + public void run() { + connectorPlugins(); + } + }, "CLASSPATH traversal thread."); + classPathTraverser.start(); + } } From d8e2e17286da72e34bd48d10380b74a0f493db4e Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 21 Apr 2016 17:32:10 -0700 Subject: [PATCH 2/3] Fix checkstyle --- .../java/org/apache/kafka/connect/runtime/AbstractHerder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index dbddd0778698..bfb2705ad680 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -84,7 +84,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con private Map tempConnectors = new ConcurrentHashMap<>(); private static final List> SKIPPED_CONNECTORS = Arrays.>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class); private static List validConnectorPlugins; - private static final Object lock = new Object(); + private static final Object LOCK = new Object(); private Thread classPathTraverser; @@ -257,7 +257,7 @@ public ConfigInfos validateConfigs(String connType, Map connecto } public static List connectorPlugins() { - synchronized (lock) { + synchronized (LOCK) { if (validConnectorPlugins != null) { return validConnectorPlugins; } From 92ac64a84b261a452daf94bb96dedfa0edf7aeaf Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 21 Apr 2016 17:54:09 -0700 Subject: [PATCH 3/3] Fix test failues --- .../apache/kafka/connect/runtime/AbstractHerder.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index bfb2705ad680..bd735895b996 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -111,10 +111,12 @@ protected void stopServices() { this.statusBackingStore.stop(); this.configBackingStore.stop(); this.worker.stop(); - try { - this.classPathTraverser.join(); - } catch (InterruptedException e) { - // ignore as it can only happen during shutdown + if (this.classPathTraverser != null) { + try { + this.classPathTraverser.join(); + } catch (InterruptedException e) { + // ignore as it can only happen during shutdown + } } }