Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NUTCH-2451 protocol-ftp to resolve relative URL when following redirects #241

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/java/org/apache/nutch/plugin/PluginManifestParser.java
Expand Up @@ -30,6 +30,7 @@
import javax.xml.parsers.ParserConfigurationException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.conf.Configuration;
import org.w3c.dom.Document;
Expand All @@ -49,7 +50,8 @@ public class PluginManifestParser {
private static final String ATTR_CLASS = "class";
private static final String ATTR_ID = "id";

public static final Logger LOG = PluginRepository.LOG;
protected static final Logger LOG = LoggerFactory
.getLogger(PluginManifestParser.class);

private static final boolean WINDOWS = System.getProperty("os.name")
.startsWith("Windows");
Expand All @@ -71,7 +73,8 @@ public PluginManifestParser(Configuration conf,
* folders to search plugins from
* @return A {@link Map} of all found {@link PluginDescriptor}s.
*/
public Map<String, PluginDescriptor> parsePluginFolder(String[] pluginFolders) {
public Map<String, PluginDescriptor> parsePluginFolder(
String[] pluginFolders) {
Map<String, PluginDescriptor> map = new HashMap<>();

if (pluginFolders == null) {
Expand Down Expand Up @@ -158,8 +161,8 @@ private PluginDescriptor parseManifestFile(String pManifestPath)
* @throws ParserConfigurationException
* @throws DocumentException
*/
private Document parseXML(URL url) throws SAXException, IOException,
ParserConfigurationException {
private Document parseXML(URL url)
throws SAXException, IOException, ParserConfigurationException {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
return builder.parse(url.openStream());
Expand Down Expand Up @@ -220,8 +223,8 @@ private void parseRequires(Element pRootElement, PluginDescriptor pDescriptor)
* @param pDescriptor
* @throws MalformedURLException
*/
private void parseLibraries(Element pRootElement, PluginDescriptor pDescriptor)
throws MalformedURLException {
private void parseLibraries(Element pRootElement,
PluginDescriptor pDescriptor) throws MalformedURLException {
NodeList nodelist = pRootElement.getElementsByTagName("runtime");
if (nodelist.getLength() > 0) {

Expand Down
175 changes: 142 additions & 33 deletions src/java/org/apache/nutch/plugin/PluginRepository.java
Expand Up @@ -21,6 +21,9 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -43,8 +46,13 @@
* descriptor represents all meta information about a plugin. So a plugin
* instance will be created later when it is required, this allow lazy plugin
* loading.
*
* As protocol-plugins need to be registered with the JVM as well, this class
* also acts as URLStreamHanderFactory that registers with the JVM and supports
* all the new protocols as if they were native. Details how the JVM creates
* URLs can be seen in the API documentation for the URL constructor.
*/
public class PluginRepository {
public class PluginRepository implements URLStreamHandlerFactory {
private static final WeakHashMap<String, PluginRepository> CACHE = new WeakHashMap<>();

private boolean auto;
Expand Down Expand Up @@ -90,9 +98,12 @@ public PluginRepository(Configuration conf) throws RuntimeException {
try {
installExtensions(fRegisteredPlugins);
} catch (PluginRuntimeException e) {
LOG.error(e.toString());
LOG.error("Could not install extensions.", e.toString());
throw new RuntimeException(e.getMessage());
}

registerURLStreamHandlerFactory();

displayStatus();
}

Expand Down Expand Up @@ -120,7 +131,7 @@ private void installExtensionPoints(List<PluginDescriptor> plugins) {
for (PluginDescriptor plugin : plugins) {
for (ExtensionPoint point : plugin.getExtenstionPoints()) {
String xpId = point.getId();
LOG.debug("Adding extension point " + xpId);
LOG.debug("Adding extension point {}", xpId);
fExtensionPoints.put(xpId, point);
}
}
Expand All @@ -137,9 +148,8 @@ private void installExtensions(List<PluginDescriptor> pRegisteredPlugins)
String xpId = extension.getTargetPoint();
ExtensionPoint point = getExtensionPoint(xpId);
if (point == null) {
throw new PluginRuntimeException("Plugin ("
+ descriptor.getPluginId() + "), " + "extension point: " + xpId
+ " does not exist.");
throw new PluginRuntimeException("Plugin (" + descriptor.getPluginId()
+ "), " + "extension point: " + xpId + " does not exist.");
}
point.addExtension(extension);
}
Expand All @@ -149,8 +159,8 @@ private void installExtensions(List<PluginDescriptor> pRegisteredPlugins)
private void getPluginCheckedDependencies(PluginDescriptor plugin,
Map<String, PluginDescriptor> plugins,
Map<String, PluginDescriptor> dependencies,
Map<String, PluginDescriptor> branch) throws MissingDependencyException,
CircularDependencyException {
Map<String, PluginDescriptor> branch)
throws MissingDependencyException, CircularDependencyException {

if (dependencies == null) {
dependencies = new HashMap<>();
Expand All @@ -164,8 +174,8 @@ private void getPluginCheckedDependencies(PluginDescriptor plugin,
for (String id : plugin.getDependencies()) {
PluginDescriptor dependency = plugins.get(id);
if (dependency == null) {
throw new MissingDependencyException("Missing dependency " + id
+ " for plugin " + plugin.getPluginId());
throw new MissingDependencyException(
"Missing dependency " + id + " for plugin " + plugin.getPluginId());
}
if (branch.containsKey(id)) {
throw new CircularDependencyException("Circular dependency detected "
Expand Down Expand Up @@ -196,7 +206,8 @@ private Map<String, PluginDescriptor> getPluginCheckedDependencies(
* @return List
*/
private List<PluginDescriptor> getDependencyCheckedPlugins(
Map<String, PluginDescriptor> filtered, Map<String, PluginDescriptor> all) {
Map<String, PluginDescriptor> filtered,
Map<String, PluginDescriptor> all) {
if (filtered == null) {
return null;
}
Expand All @@ -223,8 +234,8 @@ private List<PluginDescriptor> getDependencyCheckedPlugins(
* @return PluginDescriptor[]
*/
public PluginDescriptor[] getPluginDescriptors() {
return fRegisteredPlugins.toArray(new PluginDescriptor[fRegisteredPlugins
.size()]);
return fRegisteredPlugins
.toArray(new PluginDescriptor[fRegisteredPlugins.size()]);
}

/**
Expand Down Expand Up @@ -278,10 +289,10 @@ public Plugin getPluginInstance(PluginDescriptor pDescriptor)
synchronized (pDescriptor) {
Class<?> pluginClass = getCachedClass(pDescriptor,
pDescriptor.getPluginClass());
Constructor<?> constructor = pluginClass.getConstructor(new Class<?>[] {
PluginDescriptor.class, Configuration.class });
Plugin plugin = (Plugin) constructor.newInstance(new Object[] {
pDescriptor, this.conf });
Constructor<?> constructor = pluginClass.getConstructor(
new Class<?>[] { PluginDescriptor.class, Configuration.class });
Plugin plugin = (Plugin) constructor
.newInstance(new Object[] { pDescriptor, this.conf });
plugin.startUp();
fActivatedPlugins.put(pDescriptor.getPluginId(), plugin);
return plugin;
Expand Down Expand Up @@ -336,14 +347,14 @@ public Class getCachedClass(PluginDescriptor pDescriptor, String className)
}

private void displayStatus() {
LOG.info("Plugin Auto-activation mode: [" + this.auto + "]");
LOG.info("Plugin Auto-activation mode: [{}]", this.auto);
LOG.info("Registered Plugins:");

if ((fRegisteredPlugins == null) || (fRegisteredPlugins.size() == 0)) {
LOG.info("\tNONE");
} else {
for (PluginDescriptor plugin : fRegisteredPlugins) {
LOG.info("\t" + plugin.getName() + " (" + plugin.getPluginId() + ")");
LOG.info("\t{} ({})", plugin.getName(), plugin.getPluginId());
}
}

Expand All @@ -352,7 +363,7 @@ private void displayStatus() {
LOG.info("\tNONE");
} else {
for (ExtensionPoint ep : fExtensionPoints.values()) {
LOG.info("\t" + ep.getName() + " (" + ep.getId() + ")");
LOG.info("\t ({})", ep.getName(), ep.getId());
}
}
}
Expand Down Expand Up @@ -388,11 +399,11 @@ private Map<String, PluginDescriptor> filter(Pattern excludes,
}

if (!includes.matcher(id).matches()) {
LOG.debug("not including: " + id);
LOG.debug("not including: {}", id);
continue;
}
if (excludes.matcher(id).matches()) {
LOG.debug("excluding: " + id);
LOG.debug("excluding: {}", id);
continue;
}
map.put(plugin.getPluginId(), plugin);
Expand Down Expand Up @@ -431,8 +442,8 @@ public synchronized Object[] getOrderedPlugins(Class<?> clazz,
}

try {
ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
xPointId);
ExtensionPoint point = PluginRepository.get(conf)
.getExtensionPoint(xPointId);
if (point == null)
throw new RuntimeException(xPointId + " not found.");
Extension[] extensions = point.getExtensions();
Expand All @@ -450,9 +461,8 @@ public synchronized Object[] getOrderedPlugins(Class<?> clazz,
for (String orderedFilter : orderOfFilters) {
Object f = filterMap.get(orderedFilter);
if (f == null) {
LOG.error(clazz.getSimpleName() + " : " + orderedFilter
+ " declared in configuration property " + orderProperty
+ " but not found in an active plugin - ignoring.");
LOG.error("{} : {} declared in configuration property {} but not found in an active plugin - ignoring.", clazz.getSimpleName(), orderedFilter
, orderProperty);
continue;
}
sorted.add(f);
Expand All @@ -461,8 +471,8 @@ public synchronized Object[] getOrderedPlugins(Class<?> clazz,
for (int i = 0; i < sorted.size(); i++) {
filter[i] = sorted.get(i);
if (LOG.isTraceEnabled()) {
LOG.trace(clazz.getSimpleName() + " : filters[" + i + "] = "
+ filter[i].getClass());
LOG.trace("{} : filters[{}] = {}", clazz.getSimpleName() , i,
filter[i].getClass());
}
}
objectCache.setObject(clazz.getName(), filter);
Expand All @@ -487,8 +497,8 @@ public synchronized Object[] getOrderedPlugins(Class<?> clazz,
*/
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err
.println("Usage: PluginRepository pluginId className [arg1 arg2 ...]");
System.err.println(
"Usage: PluginRepository pluginId className [arg1 arg2 ...]");
return;
}
Configuration conf = NutchConfiguration.create();
Expand All @@ -505,8 +515,8 @@ public static void main(String[] args) throws Exception {
try {
clazz = Class.forName(args[1], true, cl);
} catch (Exception e) {
System.err.println("Could not load the class '" + args[1] + ": "
+ e.getMessage());
System.err.println(
"Could not load the class '" + args[1] + ": " + e.getMessage());
return;
}
Method m = null;
Expand All @@ -521,4 +531,103 @@ public static void main(String[] args) throws Exception {
System.arraycopy(args, 2, subargs, 0, subargs.length);
m.invoke(null, new Object[] { subargs });
}

/**
* Registers this PluginRepository to be invoked whenever URLs have to be
* parsed. This allows to check the registered protocol plugins for uncommon
* protocols.
*/
private void registerURLStreamHandlerFactory() {
org.apache.nutch.plugin.URLStreamHandlerFactory.getInstance().registerPluginRepository(this);
}

/**
* Invoked whenever a java.net.URL needs to be instantiated. Tries to find a
* suitable extension and allow it to provide a URLStreamHandler. This is done
* by several attempts:
* <ul>
* <li>Find a protocol plugin that implements the desired protocol. If found,
* instantiate it so eventually the plugin can install a URLStreamHandler
* through a static hook.</li>
* <li>If the plugin specifies a URLStreamHandler in its <tt>plugin.xml</tt>,
* return an instance of this URLStreamHandler. Example:
*
* <pre>
* ...
* &lt;implementation id="org.apache.nutch.protocol.foo.Foo" class="org.apache.nutch.protocol.foo.Foo"&gt;
* &lt;parameter name="protocolName" value="foo"/&gt;
* &lt;parameter name="urlStreamHandler" value="org.apache.nutch.protocol.foo.Handler"/&gt;
* &lt;/implementation&gt;
* ...
* </pre>
*
* </li>
* <li>if all else fails, return null. This will fallback to the JVM's method
* of evaluating the system property <tt>java.protocol.handler.pkgs</tt>.</li>
* </ul>
*
* @return the URLStreamHandler found, or null.
* @see java.net.URL
*/
public URLStreamHandler createURLStreamHandler(String protocol) {
LOG.debug("createURLStreamHandler({})", protocol);

if (fExtensionPoints != null) {
ExtensionPoint ep = fExtensionPoints
.get("org.apache.nutch.protocol.Protocol");
if (ep != null) {
Extension[] extensions = ep.getExtensions();
for (Extension extension : extensions) {
String p = extension.getAttribute("protocolName");
LOG.trace("Found {}", p);
if (p.equals(protocol)) {
LOG.debug("suitable {}", p);

// instantiate the plugin. This allows it to execute a static hook,
// if present. Extensions and PluginInstances are cached already, so we
// should not create too many instances
Object extinst = null;
try {
extinst = extension.getExtensionInstance();
LOG.debug("found {}", extinst.getClass().getName());
} catch (Exception e) {
LOG.warn("Could not find {}", extension.getId(), e);
}

// return the handler here, if possible
String handlerClass = extension.getAttribute("urlStreamHandler");
LOG.debug("urlStreamHandler={}", handlerClass);
if (handlerClass != null) {
// instantiate the handler and return it
ClassLoader cl = this.getClass().getClassLoader(); // the nutch
// classloader
LOG.trace("Using nutch classloader {}", cl);
if (extinst != null) {
cl = extinst.getClass().getClassLoader(); // the extension's
// classloader
LOG.trace("Using extension classloader {}", cl);
}

try {
Class clazz = cl.loadClass(handlerClass);
return (URLStreamHandler) clazz.newInstance();
} catch (Exception e) {
LOG.error("Could not instantiate protocol {} handler class {} defined by extension {}", protocol, handlerClass, extension.getId(), e);
return null;
}
}

LOG.debug(
"suitable protocol extension found that did not declare a handler");
return null;
}
}
LOG.debug("No suitable protocol extensions registered");
} else {
LOG.debug("No protocol extensions registered?");
}
}

return null;
}
}