Skip to content

Commit

Permalink
use Java SPI to load and instantiate NetworkModules
Browse files Browse the repository at this point in the history
The usage of the Java Service Provider Interface enables the deployment
of additional custom NetworkModules without modifications of the Paho
code.

Signed-off-by: Maik Scheibler <eclipse@scheibler-family.de>
  • Loading branch information
mscheibler committed Nov 10, 2017
1 parent 7cfc434 commit 4312c60
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 196 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.eclipse.paho.client.mqttv3.internal;

import java.net.URI;
import java.net.URISyntaxException;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class NetworkModuleServiceTest {

@Test
public void testValidateURI() {
NetworkModuleService.validateURI("tcp://host_literal:1883");
NetworkModuleService.validateURI("ssl://host_literal:8883");
NetworkModuleService.validateURI("ws://host_literal:80/path/to/ws");
NetworkModuleService.validateURI("wss://host_literal:443/path/to/ws");
}

@Test(expected = IllegalArgumentException.class)
public void failInvalidUri() {
NetworkModuleService.validateURI("no URI at all");
}

@Test(expected = IllegalArgumentException.class)
public void failWithPathOnTcpUri() {
NetworkModuleService.validateURI("tcp://host_literal:1883/somePath");
}

@Test(expected = IllegalArgumentException.class)
public void failWithPathOnSslUri() {
NetworkModuleService.validateURI("ssl://host_literal:1883/somePath");
}

@Test(expected = IllegalArgumentException.class)
public void failWithUnsuppurtedSchemeUri() {
NetworkModuleService.validateURI("unknown://host_literal:1883");
}

/**
* Test for URI parsing with '_' in hostname.
*/
@Test
public void testApplyRFC3986AuthorityPatch() throws URISyntaxException {
URI uri = new URI("tcp://user:password@some_host:666/some_path");
/*
* If the following asserts trigger, then the patch may be no longer required, as Java URI class does the
* RFC3986 parsing itself.
*/
assertNull("patch no longer necessary?", uri.getUserInfo());
assertNull("patch no longer necessary?", uri.getHost());
assertEquals("patch no longer necessary?", -1, uri.getPort());

NetworkModuleService.applyRFC3986AuthorityPatch(uri);

assertEquals("wrong user info", "user:password", uri.getUserInfo());
assertEquals("wrong hostname", "some_host", uri.getHost());
assertEquals("wrong port", 666, uri.getPort());
}

@Test
public void testCreateInstance() throws MqttException {
String brokerUri = "tcp://localhost:666";
MqttConnectOptions options = new MqttConnectOptions();
int conTimeout = 234;
options.setConnectionTimeout(conTimeout);
String clientId = "";

NetworkModule result = NetworkModuleService.createInstance(brokerUri, options, clientId);

assertTrue(result instanceof TCPNetworkModule);
assertEquals(brokerUri, result.getServerURI());
}
}
2 changes: 1 addition & 1 deletion org.eclipse.paho.client.mqttv3/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ Export-Package: org.eclipse.paho.client.mqttv3;version="1.2.1.qualifier",
org.eclipse.paho.client.mqttv3.util;version="1.2.1.qualifier"
Bundle-Vendor: %bundle.provider
Bundle-ActivationPolicy: lazy
Bundle-RequiredExecutionEnvironment: J2SE-1.4
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Import-Package: javax.net;resolution:=optional,
javax.net.ssl;resolution:=optional
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@

package org.eclipse.paho.client.mqttv3;

import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Hashtable;
import java.util.Properties;
import java.util.Timer;
Expand All @@ -32,18 +29,12 @@
import java.util.concurrent.ScheduledExecutorService;

import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;

import org.eclipse.paho.client.mqttv3.internal.ClientComms;
import org.eclipse.paho.client.mqttv3.internal.ConnectActionListener;
import org.eclipse.paho.client.mqttv3.internal.DisconnectedMessageBuffer;
import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
import org.eclipse.paho.client.mqttv3.internal.NetworkModule;
import org.eclipse.paho.client.mqttv3.internal.SSLNetworkModule;
import org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule;
import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory;
import org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketNetworkModule;
import org.eclipse.paho.client.mqttv3.internal.websocket.WebSocketSecureNetworkModule;
import org.eclipse.paho.client.mqttv3.internal.NetworkModuleService;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttDisconnect;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish;
import org.eclipse.paho.client.mqttv3.internal.wire.MqttSubscribe;
Expand Down Expand Up @@ -460,7 +451,7 @@ public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence
throw new IllegalArgumentException("ClientId longer than 65535 characters");
}

MqttConnectOptions.validateURI(serverURI);
NetworkModuleService.validateURI(serverURI);

this.serverURI = serverURI;
this.clientId = clientId;
Expand Down Expand Up @@ -547,129 +538,7 @@ private NetworkModule createNetworkModule(String address, MqttConnectOptions opt
// @TRACE 115=URI={0}
log.fine(CLASS_NAME,methodName, "115", new Object[] {address});

NetworkModule netModule;
SocketFactory factory = options.getSocketFactory();

int serverURIType = MqttConnectOptions.validateURI(address);

URI uri;
try {
uri = new URI(address);
// If the returned uri contains no host and the address contains underscores,
// then it's likely that Java did not parse the URI
if(uri.getHost() == null && address.contains("_")){
try {
final Field hostField = URI.class.getDeclaredField("host");
hostField.setAccessible(true);
// Get everything after the scheme://
String shortAddress = address.substring(uri.getScheme().length() + 3);
hostField.set(uri, getHostName(shortAddress));

} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
throw ExceptionHelper.createMqttException(e.getCause());
}

}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Malformed URI: " + address + ", " + e.getMessage());
}

String host = uri.getHost();
int port = uri.getPort(); // -1 if not defined

switch (serverURIType) {
case MqttConnectOptions.URI_TYPE_TCP :
if (port == -1) {
port = 1883;
}
if (factory == null) {
factory = SocketFactory.getDefault();
}
else if (factory instanceof SSLSocketFactory) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
netModule = new TCPNetworkModule(factory, host, port, clientId);
((TCPNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
break;
case MqttConnectOptions.URI_TYPE_SSL:
if (port == -1) {
port = 8883;
}
SSLSocketFactoryFactory factoryFactory = null;
if (factory == null) {
// try {
factoryFactory = new SSLSocketFactoryFactory();
Properties sslClientProps = options.getSSLProperties();
if (null != sslClientProps)
factoryFactory.initialize(sslClientProps, null);
factory = factoryFactory.createSocketFactory(null);
// }
// catch (MqttDirectException ex) {
// throw ExceptionHelper.createMqttException(ex.getCause());
// }
}
else if ((factory instanceof SSLSocketFactory) == false) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}

// Create the network module...
netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId);
((SSLNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
((SSLNetworkModule)netModule).setSSLHostnameVerifier(options.getSSLHostnameVerifier());
// Ciphers suites need to be set, if they are available
if (factoryFactory != null) {
String[] enabledCiphers = factoryFactory.getEnabledCipherSuites(null);
if (enabledCiphers != null) {
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}
break;
case MqttConnectOptions.URI_TYPE_WS:
if (port == -1) {
port = 80;
}
if (factory == null) {
factory = SocketFactory.getDefault();
}
else if (factory instanceof SSLSocketFactory) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
break;
case MqttConnectOptions.URI_TYPE_WSS:
if (port == -1) {
port = 443;
}
SSLSocketFactoryFactory wSSFactoryFactory = null;
if (factory == null) {
wSSFactoryFactory = new SSLSocketFactoryFactory();
Properties sslClientProps = options.getSSLProperties();
if (null != sslClientProps)
wSSFactoryFactory.initialize(sslClientProps, null);
factory = wSSFactoryFactory.createSocketFactory(null);

}
else if ((factory instanceof SSLSocketFactory) == false) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}

// Create the network module...
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
// Ciphers suites need to be set, if they are available
if (wSSFactoryFactory != null) {
String[] enabledCiphers = wSSFactoryFactory.getEnabledCipherSuites(null);
if (enabledCiphers != null) {
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}
break;
default:
// This shouldn't happen, as long as validateURI() has been called.
log.fine(CLASS_NAME,methodName, "119", new Object[] {address});
netModule = null;
}
NetworkModule netModule = NetworkModuleService.createInstance(address, options, clientId);
return netModule;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@

import javax.net.SocketFactory;
import javax.net.ssl.HostnameVerifier;

import org.eclipse.paho.client.mqttv3.internal.NetworkModuleService;
import org.eclipse.paho.client.mqttv3.util.Debug;

import java.net.URI;
import java.net.URISyntaxException;

/**
* Holds the set of options that control how the client connects to a server.
*/
Expand Down Expand Up @@ -60,12 +57,6 @@ public class MqttConnectOptions {
*/
public static final int MQTT_VERSION_3_1_1 = 4;

protected static final int URI_TYPE_TCP = 0;
protected static final int URI_TYPE_SSL = 1;
protected static final int URI_TYPE_LOCAL = 2;
protected static final int URI_TYPE_WS = 3;
protected static final int URI_TYPE_WSS = 4;

private int keepAliveInterval = KEEP_ALIVE_INTERVAL_DEFAULT;
private int maxInflight = MAX_INFLIGHT_DEFAULT;
private String willDestination = null;
Expand Down Expand Up @@ -510,51 +501,13 @@ public String[] getServerURIs() {
* durable subscriptions are not valid. The cleansession flag must be set to true if the
* hunt list mode is used</p></li>
* </ol>
* @param array of serverURIs
* @param serverURIs to be used by the client
*/
public void setServerURIs(String[] array) {
for (int i = 0; i < array.length; i++) {
validateURI(array[i]);
}
this.serverURIs = array.clone();
}

/**
* Validate a URI
* @param srvURI The Server URI
* @return the URI type
*/
public static int validateURI(String srvURI) {
try {
URI vURI = new URI(srvURI);
if ("ws".equals(vURI.getScheme())){
return URI_TYPE_WS;
}
else if ("wss".equals(vURI.getScheme())) {
return URI_TYPE_WSS;
}

if ((vURI.getPath() == null) || vURI.getPath().isEmpty()) {
// No op path must be empty
}
else {
throw new IllegalArgumentException(srvURI);
}
if ("tcp".equals(vURI.getScheme())) {
return URI_TYPE_TCP;
}
else if ("ssl".equals(vURI.getScheme())) {
return URI_TYPE_SSL;
}
else if ("local".equals(vURI.getScheme())) {
return URI_TYPE_LOCAL;
}
else {
throw new IllegalArgumentException(srvURI);
}
} catch (URISyntaxException ex) {
throw new IllegalArgumentException(srvURI);
public void setServerURIs(String[] serverURIs) {
for (String serverURI:serverURIs) {
NetworkModuleService.validateURI(serverURI);
}
this.serverURIs = serverURIs.clone();
}

/**
Expand Down
Loading

0 comments on commit 4312c60

Please sign in to comment.