diff --git a/client/generate-client-java.sh b/client/cloud/generate-client-java.sh similarity index 100% rename from client/generate-client-java.sh rename to client/cloud/generate-client-java.sh diff --git a/client/gateway/api/about.html b/client/gateway/api/about.html new file mode 100644 index 00000000000..832cf11556a --- /dev/null +++ b/client/gateway/api/about.html @@ -0,0 +1,26 @@ + + + +About + + +

About This Content

+ +

June 27, 2017

+

License

+ +

The Eclipse Foundation makes available all content in this plug-in ("Content"). Unless otherwise +indicated below, the Content is provided to you under the terms and conditions of the +Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is available +at http://www.eclipse.org/legal/epl-v10.html. +For purposes of the EPL, "Program" will mean the Content.

+ +

If you did not receive this Content directly from the Eclipse Foundation, the Content is +being redistributed by another party ("Redistributor") and different terms and conditions may +apply to your use of any object code in the Content. Check the Redistributor's license that was +provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise +indicated below, the terms and conditions of the EPL still apply to any source code in the Content +and such source code may be obtained at http://www.eclipse.org.

+ + + \ No newline at end of file diff --git a/client/gateway/api/pom.xml b/client/gateway/api/pom.xml new file mode 100644 index 00000000000..73bec3a2553 --- /dev/null +++ b/client/gateway/api/pom.xml @@ -0,0 +1,51 @@ + + + + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-api + bundle + Eclipse Kapua :: Gateway Client :: API + The gateway client API definition + + + + org.slf4j + slf4j-api + + + + + ch.qos.logback + logback-classic + test + + + junit + junit + test + + + + \ No newline at end of file diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Application.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Application.java new file mode 100644 index 00000000000..e61d170c818 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Application.java @@ -0,0 +1,50 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +/** + * An application is a sub-unit of a client, focused on handling data + *

+ * The {@link Client} instance is more technical unit, where the {@link Application} + * is more a data oriented unit. + *

+ */ +public interface Application extends AutoCloseable { + + public interface Builder { + + public Application build(); + } + + /** + * Lookup a data controller to an application topic + * + * @param topic + * the topic the controller is bound to, must never be {@code null} + * @return the data controller + */ + public Data data(Topic topic); + + /** + * Lookup a transport controller + *

+ * A transport controller for the client's underlying transport mechanism. + *

+ *

+ * Note: Each application has its own instance and thus can set + * events handler independent of the root client or other applications. + *

+ * + * @return the transport controller + */ + public Transport transport(); +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/BinaryPayloadCodec.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/BinaryPayloadCodec.java new file mode 100644 index 00000000000..156e4bf8e34 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/BinaryPayloadCodec.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import java.nio.ByteBuffer; + +/** + * A codec for BLOB based payload transports + */ +public interface BinaryPayloadCodec { + + /** + * Encode a {@link Payload} structure into a BLOB + *

+ * The payload information gets encoded into a blob and appended at the end of the + * provided buffer. If the provided buffer is {@code null} a new buffer will be allocated. + *

+ *

+ * Note: The returning buffer may not be the same as the provided buffer. If the + * provided buffer has less remaining space than required a new buffer is allocated and + * returned, which will contain both the existing content as well as the newly appended. + *

+ * + * @param payload + * The payload to encode, must not be {@code null} + * @param buffer + * An optional buffer to append the output to, may be {@code null} + * @return A buffer with the appended payload output, must never be {@code null} + * @throws Exception + * if anything goes wrong + */ + public ByteBuffer encode(Payload payload, ByteBuffer buffer) throws Exception; + + /** + * Decode a {@link Payload} structure from the provided BLOB + * + * @param buffer + * The buffer to read from, must not be {@code null} + * @return the decoded payload structure, may be {@code null} + * @throws Exception + * if anything goes wrong + */ + public Payload decode(ByteBuffer buffer) throws Exception; +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Client.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Client.java new file mode 100644 index 00000000000..94b5b2457a3 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Client.java @@ -0,0 +1,61 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +/** + * A client connection + *

+ * An instance of a client can be obtained by building and instance + * via different protocol stacks. e.g. see the {@code KuraMqttProfile}. + *

+ *

+ * In order to free resources the instance has to be closed when it is no + * longer needed. + *

+ *

+ * Data is exchanged by the use of {@link Application}s. + *

+ */ +public interface Client extends AutoCloseable { + + public interface Builder { + + public Client build() throws Exception; + } + + /** + * Get control over the transport + * + * @return The transport control instance + */ + public Transport transport(); + + /** + * Create a new application instance + *

+ * This method only returns a new builder which will + * create a new instance once {@link Application.Builder#build()} is called. + * Before that the application is not built and no resources are claimed. + *

+ *

+ * Application IDs are unique. If an application ID is already allocated it + * cannot be allocated a second time. The second call to {@link Application.Builder#build()} will fail. + * However this application ID only allocated once the call to {@link Application.Builder#build()} + * succeeded. + *

+ * + * @param applicationId + * The ID of the application to create + * @return the new {@link Application.Builder} instance + */ + public Application.Builder buildApplication(String applicationId); +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Credentials.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Credentials.java new file mode 100644 index 00000000000..523b7973f3e --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Credentials.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +public final class Credentials { + + private Credentials() { + } + + public static class UserAndPassword { + + private final String username; + private final char[] password; + + private UserAndPassword(final String username, final char[] password) { + this.username = username; + this.password = password; + } + + public String getUsername() { + return username; + } + + public char[] getPassword() { + return password; + } + + public String getPasswordAsString() { + if (password == null) { + return null; + } + + return String.valueOf(password); + } + } + + public static UserAndPassword userAndPassword(final String username, final String password) { + return new UserAndPassword(username, password != null ? password.toCharArray() : null); + } + + public static UserAndPassword userAndPassword(final String username, final char[] password) { + return new UserAndPassword(username, password); + } +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Data.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Data.java new file mode 100644 index 00000000000..59cfdf2d2fc --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Data.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +/** + * An interface for control data + *

+ * An instance of this is bound to a topic and can be looked up by a call + * to {@link Application#data(Topic)}. + *

+ */ +public interface Data extends Sender { + + /** + * Receive messages on this data topic + *

+ * Subscriptions will automatically be re-established after a connection loss. + *

+ * + * @param handler + * the handler which should process received messages + * @throws Exception + * if anything goes wrong on the subscription process + */ + public default void subscribe(final MessageHandler handler) throws Exception { + subscribe(handler, Errors::ignore); + } + + /** + * Receive messages and handle reception errors on this data topic + *

+ * Subscriptions will automatically be re-established after a connection loss. + *

+ * + * @param handler + * the handler which should process received messages + * @param errorHandler + * the handler which should process received messages which got received + * but could not be properly parsed + * @throws Exception + * if anything goes wrong on the subscription process + */ + public void subscribe(MessageHandler handler, ErrorHandler errorHandler) throws Exception; +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/ErrorHandler.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/ErrorHandler.java new file mode 100644 index 00000000000..2de88d98e70 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/ErrorHandler.java @@ -0,0 +1,20 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import java.util.Optional; + +@FunctionalInterface +public interface ErrorHandler { + + public void handleError(Throwable e, Optional payload) throws X; +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Errors.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Errors.java new file mode 100644 index 00000000000..fce5cbbba6f --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Errors.java @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import java.util.Optional; +import java.util.function.BiConsumer; + +public final class Errors { + + private static final ErrorHandler IGNORE = Errors::ignore; + + private Errors() { + } + + public static ErrorHandler ignore() { + return IGNORE; + } + + public static ErrorHandler handle(final BiConsumer> handler) { + return new ErrorHandler() { + + @Override + public void handleError(final Throwable e, final Optional payload) throws RuntimeException { + handler.accept(e, payload); + } + }; + } + + public static void ignore(final Throwable e, final Optional payload) { + } +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/MessageErrorHandler.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/MessageErrorHandler.java new file mode 100644 index 00000000000..835c37a5cf3 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/MessageErrorHandler.java @@ -0,0 +1,18 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +@FunctionalInterface +public interface MessageErrorHandler { + + public void handleError(Throwable e); +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/MessageHandler.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/MessageHandler.java new file mode 100644 index 00000000000..98f6c6ff5d5 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/MessageHandler.java @@ -0,0 +1,18 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +@FunctionalInterface +public interface MessageHandler { + + public void handleMessage(Payload payload); +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Module.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Module.java new file mode 100644 index 00000000000..1598cfdbacc --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Module.java @@ -0,0 +1,30 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +public interface Module { + + public default void initialize(final ModuleContext context) { + } + + public default void applicationAdded(final String applicationId) { + } + + public default void applicationRemoved(final String applicationId) { + } + + public default void connected() { + } + + public default void disconnected() { + } +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/ModuleContext.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/ModuleContext.java new file mode 100644 index 00000000000..e0dfa196d0f --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/ModuleContext.java @@ -0,0 +1,17 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +public interface ModuleContext { + + public Client getClient(); +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Payload.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Payload.java new file mode 100644 index 00000000000..26e580fdd0b --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Payload.java @@ -0,0 +1,120 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import static java.time.Instant.now; +import static java.util.Collections.singletonMap; +import static java.util.Collections.unmodifiableMap; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Payload data + */ +public class Payload { + + public static class Builder { + + private Instant timestamp; + + private final Map values = new HashMap<>(); + + public Builder() { + timestamp = Instant.now(); + } + + public Instant timestamp() { + return timestamp; + } + + public Builder timestamp(final Instant timestamp) { + Objects.requireNonNull(timestamp); + + this.timestamp = timestamp; + return this; + } + + public Map values() { + return values; + } + + public Builder values(final Map values) { + Objects.requireNonNull(values); + + this.values.clear(); + this.values.putAll(values); + + return this; + } + + public Builder put(final String key, final Object value) { + Objects.requireNonNull(key); + + values.put(key, value); + return this; + } + + public Payload build() { + return new Payload(timestamp, values, true); + } + } + + private final Instant timestamp; + private final Map values; + + private Payload(final Instant timestamp, final Map values, final boolean cloneValues) { + this.timestamp = timestamp; + this.values = unmodifiableMap(cloneValues ? new HashMap<>(values) : values); + } + + public Instant getTimestamp() { + return timestamp; + } + + public Map getValues() { + return values; + } + + @Override + public String toString() { + return String.format("[Payload - timestamp: %s, values: %s]", timestamp, values); + } + + public static Payload of(final String key, final Object value) { + Objects.requireNonNull(key); + + return new Payload(now(), singletonMap(key, value), false); + } + + public static Payload of(final Map values) { + Objects.requireNonNull(values); + + return new Payload(now(), values, true); + } + + public static Payload of(final Instant timestamp, final String key, final Object value) { + Objects.requireNonNull(timestamp); + Objects.requireNonNull(key); + + return new Payload(timestamp, singletonMap(key, value), false); + } + + public static Payload of(final Instant timestamp, final Map values) { + Objects.requireNonNull(timestamp); + Objects.requireNonNull(values); + + return new Payload(timestamp, values, true); + } +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Sender.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Sender.java new file mode 100644 index 00000000000..319a0cc3dda --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Sender.java @@ -0,0 +1,48 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; + +/** + * An interface to publish data + * + * @param + * base class of sender errors + */ +public interface Sender { + + public void send(Payload payload) throws X; + + public default void send(final Payload.Builder payload) throws X { + send(payload.build()); + } + + public default Sender errors(final ErrorHandler errorHandler) { + return new Sender() { + + @Override + public void send(final Payload payload) throws Y { + requireNonNull(payload); + + try { + Sender.this.send(payload); + } catch (final Throwable e) { + errorHandler.handleError(e, Optional.of(payload)); + } + } + }; + + } +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Topic.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Topic.java new file mode 100644 index 00000000000..f887ea2589c --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Topic.java @@ -0,0 +1,133 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** + * A topic + *

+ * Note: This is not a technical MQTT topic, but an internal data topic. + * For this reason special topics like wildcards are not supported and will cause + * an {@link Exception}. + *

+ */ +public final class Topic { + + private final List segments; + + private Topic(final List segments) { + this.segments = Collections.unmodifiableList(segments); + } + + public List getSegments() { + return segments; + } + + public Stream stream() { + return segments.stream(); + } + + @Override + public String toString() { + return String.join("/", segments); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (segments == null ? 0 : segments.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Topic other = (Topic) obj; + if (segments == null) { + if (other.segments != null) { + return false; + } + } else if (!segments.equals(other.segments)) { + return false; + } + return true; + } + + public static Topic split(String path) { + if (path == null) { + return null; + } + + path = path.replaceAll("(^/+|/$)+", ""); + + if (path.isEmpty()) { + return null; + } + + return new Topic(Arrays.asList(path.split("\\/+"))); + } + + public static Topic of(final List segments) { + if (segments == null || segments.isEmpty()) { + return null; + } + + segments.forEach(Topic::ensureNotSpecial); + + return new Topic(new ArrayList<>(segments)); + } + + public static Topic of(final String first, final String... strings) { + if (first == null) { + return null; + } + + if (strings == null || strings.length <= 0) { + return new Topic(Collections.singletonList(ensureNotSpecial(first))); + } + + final List segments = new ArrayList<>(1 + strings.length); + segments.add(ensureNotSpecial(first)); + for (final String segment : strings) { + segments.add(ensureNotSpecial(segment)); + } + return new Topic(segments); + } + + public static String ensureNotSpecial(final String segment) { + if (segment == null || segment.isEmpty()) { + throw new IllegalArgumentException("Segment must not be null or empty"); + } else if ("#".equals(segment)) { + throw new IllegalArgumentException("Wildcard topics are not allowed"); + } else if ("+".equals(segment)) { + throw new IllegalArgumentException("Wildcard topics are not allowed"); + } else if (segment.contains("/")) { + throw new IllegalArgumentException("Segments must not contain slashes. Use Topic.split to parse a multi-segment topic string."); + } + return segment; + } + +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Transport.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Transport.java new file mode 100644 index 00000000000..66e0a552608 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/Transport.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import java.util.concurrent.Semaphore; +import java.util.function.Consumer; + +/** + * A control interface on the underlying client transport + *

+ * Note: There is only one set of transport events available for the client. + * Setting a new set of transport state listeners will clear the previously set listeners. + *

+ */ +public interface Transport { + + public interface TransportEvents { + + public void connected(Runnable runnable); + + public void disconnected(Runnable runnable); + } + + /** + * Set a state listener + * + *

+ * The listener will be called immediately after setting with the + * last known state. + *

+ * + * @param stateChange + * the listener to transport state changes + */ + public void state(Consumer stateChange); + + /** + * This method allows to atomically set a state listener using simple runnable. + * + *

+ * This method is intended to be used with Java lambdas where each state change + * (connected, disconnected) is mapped to one lambda. However, as the state change + * will be initially reported it might happen that the state actually changes between + * setting the connect and disconnect handler. This way there would be no way to properly + * report the initial state. + *

+ *

+ * Setting the event handlers using this methods works by updating + * the provided {@link TransportEvents} fields inside the provided consumer. The + * consumer will only be called once inside this method. The event listeners will + * then be set atomically. + *

+ * + *
+     * client.transport().events( events {@code ->} {
+     *   events.connected ( () {@code ->} System.out.println ("Connected") );
+     *   events.disconnected ( () {@code ->} System.out.println ("Disconnected") );
+     * });
+     * 
+ * + * @param events + * code to update the {@link TransportEvents} + * + */ + public default void events(final Consumer events) { + class TransportEventsImpl implements TransportEvents { + + private Runnable connected; + private Runnable disconnected; + + @Override + public void connected(final Runnable runnable) { + connected = runnable; + } + + @Override + public void disconnected(final Runnable runnable) { + disconnected = runnable; + } + + } + + final TransportEventsImpl impl = new TransportEventsImpl(); + + events.accept(impl); + + state(state -> { + if (state) { + if (impl.connected != null) { + impl.connected.run(); + } + } else { + if (impl.disconnected != null) { + impl.disconnected.run(); + } + } + }); + } + + /** + * Wait for the connection to be established + *

+ * Note: This method will reset the transport listeners. + *

+ * + * @param transport + * to wait on + * @throws InterruptedException + * if the wait got interrupted + */ + public static void waitForConnection(final Transport transport) throws InterruptedException { + + final Semaphore sem = new Semaphore(0); + + transport.state(state -> { + if (state) { + sem.release(); + } + }); + + sem.acquire(); + } +} diff --git a/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/package-info.java b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/package-info.java new file mode 100644 index 00000000000..891a131b535 --- /dev/null +++ b/client/gateway/api/src/main/java/org/eclipse/kapua/client/gateway/package-info.java @@ -0,0 +1,38 @@ +/** + * Gateway client SDK for Eclipse Kapua™ + *

+ * Provides easy access to Eclipse Kapua, acting as a gateway device. + *

+ *
+try (Client client = KuraMqttProfile.newProfile(FuseClient.Builder::new)
+  .accountName("kapua-sys")
+  .clientId("foo-bar-1")
+  .brokerUrl("tcp://localhost:1883")
+  .credentials(userAndPassword("kapua-broker", "kapua-password"))
+  .build()) {
+
+  try (Application application = client.buildApplication("app1").build()) {
+
+    // subscribe to a topic
+
+    application.data(Topic.of("my", "receiver")).subscribe(message {@code ->} {
+      System.out.format("Received: %s%n", message);
+    });
+
+    // cache sender instance
+
+    {@code Sender} sender = application
+      .data(Topic.of("my", "sender"))
+      .errors(ignore());
+
+    int i = 0;
+    while (true) {
+      // send
+      sender.send(Payload.of("counter", i++));
+      Thread.sleep(1000);
+    }
+  }
+}
+ * 
+ */ +package org.eclipse.kapua.client.gateway; \ No newline at end of file diff --git a/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/CredentialsTest.java b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/CredentialsTest.java new file mode 100644 index 00000000000..deb91dadd20 --- /dev/null +++ b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/CredentialsTest.java @@ -0,0 +1,52 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import org.eclipse.kapua.client.gateway.Credentials; +import org.eclipse.kapua.client.gateway.Credentials.UserAndPassword; +import org.junit.Assert; +import org.junit.Test; + +public class CredentialsTest { + + @Test + public void testNull1() { + final UserAndPassword c = Credentials.userAndPassword(null, (char[]) null); + Assert.assertNull(c.getUsername()); + Assert.assertNull(c.getPassword()); + Assert.assertNull(c.getPasswordAsString()); + } + + @Test + public void testNull2() { + final UserAndPassword c = Credentials.userAndPassword(null, (String) null); + Assert.assertNull(c.getUsername()); + Assert.assertNull(c.getPassword()); + Assert.assertNull(c.getPasswordAsString()); + } + + @Test + public void testNonNull1() { + final UserAndPassword c = Credentials.userAndPassword("user", "password".toCharArray()); + Assert.assertNotNull(c.getUsername()); + Assert.assertNotNull(c.getPassword()); + Assert.assertNotNull(c.getPasswordAsString()); + } + + @Test + public void testNonNull2() { + final UserAndPassword c = Credentials.userAndPassword("user", "password"); + Assert.assertNotNull(c.getUsername()); + Assert.assertNotNull(c.getPassword()); + Assert.assertNotNull(c.getPasswordAsString()); + } +} diff --git a/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/ErrorsTest.java b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/ErrorsTest.java new file mode 100644 index 00000000000..6295aa9cc40 --- /dev/null +++ b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/ErrorsTest.java @@ -0,0 +1,36 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import java.util.Optional; + +import org.eclipse.kapua.client.gateway.Errors; +import org.junit.Test; + +public class ErrorsTest { + + @Test + public void test1() { + Errors.ignore().handleError(new Exception(), Optional.empty()); + } + + @Test + public void test2() { + Errors.handle((error, payload) -> { + }).handleError(new Exception(), Optional.empty()); + } + + @Test + public void test3() { + Errors.handle(Errors::ignore).handleError(new Exception(), Optional.empty()); + } +} diff --git a/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/PayloadTest.java b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/PayloadTest.java new file mode 100644 index 00000000000..61945e1a81d --- /dev/null +++ b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/PayloadTest.java @@ -0,0 +1,117 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import java.time.Instant; +import java.util.Collections; + +import org.eclipse.kapua.client.gateway.Payload; +import org.junit.Assert; +import org.junit.Test; + +public class PayloadTest { + + @Test(expected = NullPointerException.class) + public void testNull1() { + Payload.of((String) null, null); + } + + @Test(expected = NullPointerException.class) + public void testNull2() { + Payload.of(null); + } + + @Test(expected = NullPointerException.class) + public void testNull3() { + Payload.of(null, null, null); + } + + @Test(expected = NullPointerException.class) + public void testNull4() { + Payload.of((Instant) null, null); + } + + @Test + public void test1() { + Payload p = Payload.of("foo", 1); + + Assert.assertNotNull(p); + Assert.assertEquals(1, p.getValues().size()); + Assert.assertEquals(1, p.getValues().get("foo")); + Assert.assertTrue(!Instant.now().isBefore(p.getTimestamp())); + } + + @Test + public void test2() { + Payload p = Payload.of(Collections.singletonMap("foo", 1)); + + Assert.assertNotNull(p); + Assert.assertEquals(1, p.getValues().size()); + Assert.assertEquals(1, p.getValues().get("foo")); + Assert.assertTrue(!Instant.now().isBefore(p.getTimestamp())); + } + + @Test + public void test3() { + Payload p = Payload.of(Instant.now(), "foo", 1); + + Assert.assertNotNull(p); + Assert.assertEquals(1, p.getValues().size()); + Assert.assertEquals(1, p.getValues().get("foo")); + Assert.assertTrue(!Instant.now().isBefore(p.getTimestamp())); + } + + @Test + public void test4() { + Payload p = Payload.of(Instant.now(), Collections.singletonMap("foo", 1)); + + Assert.assertNotNull(p); + Assert.assertEquals(1, p.getValues().size()); + Assert.assertEquals(1, p.getValues().get("foo")); + Assert.assertTrue(!Instant.now().isBefore(p.getTimestamp())); + } + + @Test + public void testBuilder1() { + final Payload p = new Payload.Builder().timestamp(Instant.now()).put("foo", 1).build(); + + Assert.assertNotNull(p); + Assert.assertEquals(1, p.getValues().size()); + Assert.assertEquals(1, p.getValues().get("foo")); + Assert.assertTrue(!Instant.now().isBefore(p.getTimestamp())); + } + + @Test(expected = NullPointerException.class) + public void testBuilder2() { + new Payload.Builder().timestamp(null).build(); + } + + @Test(expected = NullPointerException.class) + public void testBuilder3() { + new Payload.Builder().values(null).build(); + } + + @Test(expected = NullPointerException.class) + public void testBuilder4() { + new Payload.Builder().put(null, null).build(); + } + + @Test + public void testBuilder5() { + final Payload p = new Payload.Builder().timestamp(Instant.now()).values(Collections.singletonMap("foo", 1)).build(); + + Assert.assertNotNull(p); + Assert.assertEquals(1, p.getValues().size()); + Assert.assertEquals(1, p.getValues().get("foo")); + Assert.assertTrue(!Instant.now().isBefore(p.getTimestamp())); + } +} diff --git a/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/TopicTest.java b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/TopicTest.java new file mode 100644 index 00000000000..f9efac4eeca --- /dev/null +++ b/client/gateway/api/src/test/java/org/eclipse/kapua/client/gateway/TopicTest.java @@ -0,0 +1,186 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.eclipse.kapua.client.gateway.Topic; +import org.junit.Assert; +import org.junit.Test; + +public class TopicTest { + + @Test + public void test1() { + final Topic topic = Topic.of("foo"); + Assert.assertEquals(Arrays.asList("foo"), topic.getSegments()); + } + + @Test + public void test2() { + final Topic topic = Topic.of("foo", "bar"); + Assert.assertEquals(Arrays.asList("foo", "bar"), topic.getSegments()); + } + + @Test + public void test3() { + final Topic topic = Topic.of(new LinkedList<>(Arrays.asList("foo", "bar"))); + Assert.assertEquals(Arrays.asList("foo", "bar"), topic.getSegments()); + } + + @Test + public void testEmpty1() { + final Topic topic = Topic.of((List) null); + Assert.assertNull(topic); + } + + @Test + public void testEmpty2() { + final Topic topic = Topic.of(Collections.emptyList()); + Assert.assertNull(topic); + } + + @Test + public void testEmpty3() { + final Topic topic = Topic.of((String) null); + Assert.assertNull(topic); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmpty4() { + Topic.of("foo", "bar", null); + } + + @Test(expected = IllegalArgumentException.class) + public void testSpecial1() { + Topic.of("foo", "#"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSpecial2() { + Topic.of("foo", "+"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSpecial3() { + Topic.of("foo", ""); + } + + @Test(expected = IllegalArgumentException.class) + public void testSpecial4() { + Topic.of("foo", "foo/bar"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSpecial4a() { + Topic.of("foo", "/bar"); + } + + @Test(expected = IllegalArgumentException.class) + public void testSpecial4b() { + Topic.of("foo", "foo/"); + } + + @Test + public void testSplit1() { + final Topic topic = Topic.split("foo/bar"); + Assert.assertEquals(Arrays.asList("foo", "bar"), topic.getSegments()); + } + + @Test + public void testSplit2() { + final Topic topic = Topic.split("foo//bar"); + Assert.assertEquals(Arrays.asList("foo", "bar"), topic.getSegments()); + } + + @Test + public void testSplit3() { + final Topic topic = Topic.split("/foo//bar"); + Assert.assertEquals(Arrays.asList("foo", "bar"), topic.getSegments()); + } + + @Test + public void testSplit4() { + final Topic topic = Topic.split("/foo//bar//"); + Assert.assertEquals(Arrays.asList("foo", "bar"), topic.getSegments()); + } + + @Test + public void testSplit5() { + final Topic topic = Topic.split("//"); + Assert.assertNull(topic); + } + + @Test + public void testSplitNull() { + final Topic topic = Topic.split(null); + Assert.assertNull(topic); + } + + @Test + public void testSplitEmpty() { + final Topic topic = Topic.split(""); + Assert.assertNull(topic); + } + + @Test + public void testEquals1() { + final Topic t1 = Topic.split("foo/bar/baz"); + final Topic t2 = Topic.split("foo/bar/baz"); + assertEquals(t1, t2); + } + + @Test + public void testEquals2() { + final Topic t1 = Topic.split("foo/bar/baz"); + final Topic t2 = Topic.of("foo", "bar", "baz"); + assertEquals(t1, t2); + } + + @Test + public void testNotEquals1() { + final Topic t1 = Topic.split("foo/bar"); + final Topic t2 = Topic.split("foo/baz"); + assertNotEquals(t1, t2); + } + + @Test + public void testStream() { + final String result = Topic.split("foo/bar").stream().collect(Collectors.joining()); + assertEquals("foobar", result); + } + + @Test + public void testToString() { + assertEquals("foo/bar", Topic.of("foo", "bar").toString()); + } + + @Test + public void testHashCode() { + final Map map = new HashMap<>(); + map.put(Topic.of("foo", "bar"), 1); + map.put(Topic.of("foo", "baz"), 2); + + assertEquals(1, map.get(Topic.split("foo/bar"))); + assertEquals(2, map.get(Topic.split("foo/baz"))); + } + +} diff --git a/client/gateway/features/karaf/pom.xml b/client/gateway/features/karaf/pom.xml new file mode 100644 index 00000000000..f4ee6e4d8f0 --- /dev/null +++ b/client/gateway/features/karaf/pom.xml @@ -0,0 +1,108 @@ + + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway-features + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-feature-karaf + feature + Eclipse Kapua :: Gateway Client :: Features :: Karaf + + + 4.1.1 + + + + + org.apache.karaf.features + framework + ${karaf.version} + kar + provided + + + + org.apache.karaf.features + standard + ${karaf.version} + features + xml + provided + + + + org.eclipse.kapua + kapua-client-gateway-api + + + org.eclipse.kapua + kapua-client-gateway-spi + + + org.eclipse.kapua + kapua-client-gateway-profile-kura + + + org.eclipse.kapua + kapua-client-gateway-provider-fuse + + + org.eclipse.kapua + kapua-client-gateway-provider-paho + + + + + + + org.apache.karaf.tooling + karaf-maven-plugin + true + + + verify + verify + + verify + + + + mvn:org.apache.karaf.features/framework/${karaf.version}/xml/features + mvn:org.apache.karaf.features/standard/${karaf.version}/xml/features + file:${project.build.directory}/feature/feature.xml + + org.apache.karaf.features:framework + 1.8 + + framework + + + kapua-client-gateway-kura-paho + kapua-client-gateway-kura-fuse + + + + + + + + + diff --git a/client/gateway/features/karaf/src/main/feature/feature.xml b/client/gateway/features/karaf/src/main/feature/feature.xml new file mode 100644 index 00000000000..5bf1d0fa813 --- /dev/null +++ b/client/gateway/features/karaf/src/main/feature/feature.xml @@ -0,0 +1,63 @@ + + + + + + + + + + + + + + kapua-client-gateway + kapua-client-gateway-profile-kura + kapua-client-gateway-provider-mqtt + + mvn:${project.groupId}/kapua-client-gateway-provider-fuse/${project.version} + + mvn:org.fusesource.mqtt-client/mqtt-client/1.14 + mvn:org.fusesource.hawtbuf/hawtbuf/1.11 + mvn:org.fusesource.hawtdispatch/hawtdispatch/1.22 + mvn:org.fusesource.hawtdispatch/hawtdispatch-transport/1.22 + + + + kapua-client-gateway + kapua-client-gateway-profile-kura + kapua-client-gateway-provider-mqtt + + mvn:${project.groupId}/kapua-client-gateway-provider-paho/${project.version} + + mvn:org.eclipse.paho/org.eclipse.paho.client.mqttv3/1.1.1 + + + \ No newline at end of file diff --git a/client/gateway/features/pom.xml b/client/gateway/features/pom.xml new file mode 100644 index 00000000000..4a80b995b26 --- /dev/null +++ b/client/gateway/features/pom.xml @@ -0,0 +1,31 @@ + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client + 0.2.0-SNAPSHOT + + + kapua-client-gateway-features + pom + Eclipse Kapua :: Gateway Client :: Features + + + karaf + + + diff --git a/client/gateway/pom.xml b/client/gateway/pom.xml new file mode 100644 index 00000000000..10a9f9d3dc3 --- /dev/null +++ b/client/gateway/pom.xml @@ -0,0 +1,64 @@ + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client + 0.2.0-SNAPSHOT + + + kapua-client-gateway + pom + + + + + + api + spi + + + + provider + + + + profile/kura + + + + features + + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + + about.html + + + + + + + diff --git a/client/gateway/profile/kura/about.html b/client/gateway/profile/kura/about.html new file mode 100644 index 00000000000..832cf11556a --- /dev/null +++ b/client/gateway/profile/kura/about.html @@ -0,0 +1,26 @@ + + + +About + + +

About This Content

+ +

June 27, 2017

+

License

+ +

The Eclipse Foundation makes available all content in this plug-in ("Content"). Unless otherwise +indicated below, the Content is provided to you under the terms and conditions of the +Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is available +at http://www.eclipse.org/legal/epl-v10.html. +For purposes of the EPL, "Program" will mean the Content.

+ +

If you did not receive this Content directly from the Eclipse Foundation, the Content is +being redistributed by another party ("Redistributor") and different terms and conditions may +apply to your use of any object code in the Content. Check the Redistributor's license that was +provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise +indicated below, the terms and conditions of the EPL still apply to any source code in the Content +and such source code may be obtained at http://www.eclipse.org.

+ + + \ No newline at end of file diff --git a/client/gateway/profile/kura/pom.xml b/client/gateway/profile/kura/pom.xml new file mode 100644 index 00000000000..31c68ab01dc --- /dev/null +++ b/client/gateway/profile/kura/pom.xml @@ -0,0 +1,119 @@ + + + + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-profile-kura + bundle + Eclipse Kapua :: Gateway Client :: Profile :: Eclipse Kura + The Eclipse Kura™ communication stack profile + + + + + org.eclipse.kapua + kapua-client-gateway-api + + + + org.eclipse.kapua + kapua-client-gateway-provider-mqtt + + + + org.eclipse.kapua + kapua-client-gateway-spi + + + + org.slf4j + slf4j-api + + + + com.google.protobuf + protobuf-java + + + + + + org.eclipse.kapua + kapua-client-gateway-provider-fuse + test + + + + org.eclipse.kapua + kapua-client-gateway-provider-paho + test + + + + ch.qos.logback + logback-classic + test + + + + junit + junit + test + + + + + + + + com.github.os72 + protoc-jar-maven-plugin + + + generate-sources + + run + + + ${protobuf.version} + true + + src/main/protobuf + + + + + + + + maven-javadoc-plugin + + org.eclipse.kapua.gateway.client.kura.payload + + + + + + + \ No newline at end of file diff --git a/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraBinaryPayloadCodec.java b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraBinaryPayloadCodec.java new file mode 100644 index 00000000000..b445aa9278d --- /dev/null +++ b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraBinaryPayloadCodec.java @@ -0,0 +1,74 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.kura; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Map; +import java.util.Objects; + +import org.eclipse.kapua.client.gateway.BinaryPayloadCodec; +import org.eclipse.kapua.client.gateway.Payload; +import org.eclipse.kapua.client.gateway.kura.internal.Metrics; +import org.eclipse.kapua.client.gateway.utils.Buffers; +import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto; +import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload; + +public class KuraBinaryPayloadCodec implements BinaryPayloadCodec { + + public static class Builder { + + public KuraBinaryPayloadCodec build() { + return new KuraBinaryPayloadCodec(); + } + } + + private KuraBinaryPayloadCodec() { + } + + @Override + public ByteBuffer encode(final Payload payload, final ByteBuffer buffer) throws Exception { + + Objects.requireNonNull(payload); + + final KuraPayloadProto.KuraPayload.Builder builder = KuraPayload.newBuilder(); + builder.setTimestamp(payload.getTimestamp().toEpochMilli()); + Metrics.buildMetrics(builder, payload.getValues()); + + final byte[] data = builder.build().toByteArray(); + + if (buffer == null) { + // create a wrapped buffer + return Buffers.wrap(data); + } else if (buffer.remaining() < data.length) { + // create a new, merged buffer + buffer.flip(); + final ByteBuffer newBuffer = ByteBuffer.allocate(buffer.remaining() + data.length); + newBuffer.put(buffer); + newBuffer.put(data); + return newBuffer; + } else { + buffer.put(data); + return buffer; + } + } + + @Override + public Payload decode(final ByteBuffer buffer) throws Exception { + Objects.requireNonNull(buffer); + + final KuraPayload payload = KuraPayload.parseFrom(Buffers.toByteArray(buffer)); + final Map values = Metrics.extractMetrics(payload); + return Payload.of(Instant.ofEpochMilli(payload.getTimestamp()), values); + } + +} diff --git a/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraBirthCertificateModule.java b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraBirthCertificateModule.java new file mode 100644 index 00000000000..8e050972a0e --- /dev/null +++ b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraBirthCertificateModule.java @@ -0,0 +1,179 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.kura; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +import org.eclipse.kapua.client.gateway.Client; +import org.eclipse.kapua.client.gateway.Module; +import org.eclipse.kapua.client.gateway.ModuleContext; +import org.eclipse.kapua.client.gateway.kura.internal.Metrics; +import org.eclipse.kapua.client.gateway.mqtt.MqttClient; +import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KuraBirthCertificateModule implements Module { + + private static final Logger logger = LoggerFactory.getLogger(KuraBirthCertificateModule.class); + + @FunctionalInterface + public interface Provider { + + public void provideData(Map values); + + public static final Provider JVM = new Provider() { + + @Override + public void provideData(final Map values) { + values.put("jvm_name", System.getProperty("java.vm.name")); + values.put("jvm_version", System.getProperty("java.version")); + + values.put("os", System.getProperty("os.name")); + values.put("os_version", System.getProperty("os.version")); + values.put("os_arch", System.getProperty("os.arch")); + } + + }; + + public static final Provider RUNTIME = new Provider() { + + @Override + public void provideData(final Map values) { + values.put("available_processors", Integer.toString(Runtime.getRuntime().availableProcessors())); + values.put("total_memory", Long.toString(Runtime.getRuntime().totalMemory())); + } + + }; + } + + public static class Builder { + + private final String accountName; + + private final Set providers = new HashSet<>(); + + private Builder(final String accountName) { + this.accountName = accountName; + } + + public Builder defaultProviders() { + providers.add(Provider.JVM); + providers.add(Provider.RUNTIME); + return this; + } + + public Builder provider(final Provider provider) { + Objects.requireNonNull(provider); + providers.add(provider); + return this; + } + + public Builder providers(final Collection providers) { + Objects.requireNonNull(providers); + this.providers.addAll(providers); + return this; + } + + public Set providers() { + return Collections.unmodifiableSet(providers); + } + + public KuraBirthCertificateModule build() { + return new KuraBirthCertificateModule(accountName, providers()); + } + } + + public static Builder newBuilder(final String accountName) { + return new Builder(accountName); + } + + private final Set applications = new TreeSet<>(); + + private MqttClient client; + + private final String accountName; + + private final Set providers; + + private KuraBirthCertificateModule(final String accountName, final Set providers) { + this.accountName = accountName; + this.providers = new HashSet<>(providers); + } + + @Override + public void applicationAdded(final String applicationId) { + logger.info("Application added: {}", applicationId); + if (applications.add(applicationId)) { + sendBirthCertificate(); + } + } + + @Override + public void applicationRemoved(final String applicationId) { + logger.info("Application removed: {}", applicationId); + if (applications.remove(applicationId)) { + sendBirthCertificate(); + } + } + + @Override + public void connected() { + sendBirthCertificate(); + } + + @Override + public void initialize(final ModuleContext context) { + final Client client = context.getClient(); + if (!(client instanceof MqttClient)) { + throw new IllegalStateException(String.format("%s can only be used with an %s based instance", KuraBirthCertificateModule.class.getSimpleName(), MqttClient.class.getName())); + } + this.client = (MqttClient) client; + } + + private void sendBirthCertificate() { + logger.debug("Sending birth certificate"); + + final Map values = new HashMap<>(); + + for (final Provider provider : providers) { + provider.provideData(values); + } + + values.put("application_ids", String.join(",", applications)); + + // build payload + + final KuraPayload.Builder builder = KuraPayload.newBuilder(); + Metrics.buildMetrics(builder, values); + final ByteBuffer buffer = ByteBuffer.wrap(builder.build().toByteArray()); + + // publish MQTT payload + + final String clientId = client.getMqttClientId(); + + try { + client.publishMqtt(String.format("$EDC/%s/%s/MQTT/BIRTH", accountName, clientId), buffer); + } catch (final Exception e) { + logger.warn("Failed to publish birth certificate", e); + } + } + +} diff --git a/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraNamespace.java b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraNamespace.java new file mode 100644 index 00000000000..31ff82d3fde --- /dev/null +++ b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/KuraNamespace.java @@ -0,0 +1,70 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.kura; + +import static java.util.Objects.requireNonNull; +import static org.eclipse.kapua.client.gateway.Topic.ensureNotSpecial; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.eclipse.kapua.client.gateway.Topic; +import org.eclipse.kapua.client.gateway.mqtt.MqttNamespace; + +public class KuraNamespace implements MqttNamespace { + + public static final class Builder { + + private String accountName; + + public Builder accountName(final String accountName) { + requireNonNull(accountName); + ensureNotSpecial(accountName); + + this.accountName = accountName; + return this; + } + + public String accountName() { + return accountName; + } + + public KuraNamespace build() { + if (accountName == null || accountName.isEmpty()) { + throw new IllegalArgumentException("'accountName' must be set"); + } + + return new KuraNamespace(accountName); + } + } + + private final String accountName; + + private KuraNamespace(final String accountName) { + this.accountName = accountName; + } + + @Override + public String dataTopic(final String clientId, final String applicationId, final Topic topic) { + ensureNotSpecial(clientId); + ensureNotSpecial(applicationId); + + return Stream.concat( + Stream.of( + accountName, + clientId, + applicationId), + topic.stream()) + .collect(Collectors.joining("/")); + } + +} diff --git a/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/internal/Metrics.java b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/internal/Metrics.java new file mode 100644 index 00000000000..5c59c2c6e2c --- /dev/null +++ b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/kura/internal/Metrics.java @@ -0,0 +1,156 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.kura.internal; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload; +import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload.KuraMetric; +import org.eclipse.kapua.gateway.client.kura.payload.KuraPayloadProto.KuraPayload.KuraMetric.ValueType; + +import com.google.protobuf.ByteString; + +public final class Metrics { + + private Metrics() { + } + + /** + * Convert plain key value map into a Kura metric structure
+ * Only the supported Kura values types must be used (String, boolean, int, + * long, float, double, byte[]) + * + * @param builder + * the builder to append the metrics to + * @param metrics + * the metrics map + * @throws IllegalArgumentException + * in case of an unsupported value type + */ + public static void buildMetrics(final KuraPayload.Builder builder, final Map metrics) { + Objects.requireNonNull(metrics); + + for (final Map.Entry metric : metrics.entrySet()) { + addMetric(builder, metric.getKey(), metric.getValue()); + } + } + + public static void buildBody(final KuraPayload.Builder builder, final ByteBuffer body) { + if (body == null) { + return; + } + + Objects.requireNonNull(builder); + + builder.setBody(ByteString.copyFrom(body)); + } + + public static void addMetric(final KuraPayload.Builder builder, final String key, final Object value) { + final KuraMetric.Builder b = KuraMetric.newBuilder(); + b.setName(key); + + if (value == null) { + return; + } else if (value instanceof Boolean) { + b.setType(ValueType.BOOL); + b.setBoolValue((boolean) value); + } else if (value instanceof Integer) { + b.setType(ValueType.INT32); + b.setIntValue((int) value); + } else if (value instanceof String) { + b.setType(ValueType.STRING); + b.setStringValue((String) value); + } else if (value instanceof Long) { + b.setType(ValueType.INT64); + b.setLongValue((Long) value); + } else if (value instanceof Double) { + b.setType(ValueType.DOUBLE); + b.setDoubleValue((Double) value); + } else if (value instanceof Float) { + b.setType(ValueType.FLOAT); + b.setFloatValue((Float) value); + } else if (value instanceof byte[]) { + b.setType(ValueType.BYTES); + b.setBytesValue(ByteString.copyFrom((byte[]) value)); + } else { + throw new IllegalArgumentException(String.format("Illegal metric data type: %s", value.getClass())); + } + + builder.addMetric(b); + } + + public static Map extractMetrics(final KuraPayload payload) { + if (payload == null) { + return null; + } + return extractMetrics(payload.getMetricList()); + } + + public static Map extractMetrics(final List metricList) { + if (metricList == null) { + return null; + } + + /* + * We are using a TreeMap in order to have a stable order of properties + */ + final Map result = new TreeMap<>(); + + for (final KuraMetric metric : metricList) { + final String name = metric.getName(); + switch (metric.getType()) { + case BOOL: + result.put(name, metric.getBoolValue()); + break; + case BYTES: + result.put(name, metric.getBytesValue().toByteArray()); + break; + case DOUBLE: + result.put(name, metric.getDoubleValue()); + break; + case FLOAT: + result.put(name, metric.getFloatValue()); + break; + case INT32: + result.put(name, metric.getIntValue()); + break; + case INT64: + result.put(name, metric.getLongValue()); + break; + case STRING: + result.put(name, metric.getStringValue()); + break; + } + } + + return result; + } + + public static String getAsString(final Map metrics, final String key) { + return getAsString(metrics, key, null); + } + + public static String getAsString(final Map metrics, final String key, final String defaultValue) { + final Object value = metrics.get(key); + if (value == null) { + return defaultValue; + } + if (value instanceof String) { + return (String) value; + } + return defaultValue; + } +} diff --git a/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/profile/kura/KuraMqttProfile.java b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/profile/kura/KuraMqttProfile.java new file mode 100644 index 00000000000..6f98a03b9c2 --- /dev/null +++ b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/profile/kura/KuraMqttProfile.java @@ -0,0 +1,104 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.profile.kura; + +import static java.util.Objects.requireNonNull; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.eclipse.kapua.client.gateway.Client; +import org.eclipse.kapua.client.gateway.Credentials.UserAndPassword; +import org.eclipse.kapua.client.gateway.kura.KuraBinaryPayloadCodec; +import org.eclipse.kapua.client.gateway.kura.KuraBirthCertificateModule; +import org.eclipse.kapua.client.gateway.kura.KuraNamespace; +import org.eclipse.kapua.client.gateway.mqtt.MqttClient; + +public class KuraMqttProfile> { + + public static > KuraMqttProfile newProfile(final Supplier builderSupplier) { + requireNonNull(builderSupplier); + return new KuraMqttProfile<>(builderSupplier); + } + + private final Supplier builderSupplier; + private String accountName; + private String brokerUrl; + private String clientId; + private UserAndPassword userAndPassword; + private Consumer customizer; + + private KuraMqttProfile(final Supplier builderSupplier) { + this.builderSupplier = builderSupplier; + } + + public KuraMqttProfile accountName(final String accountName) { + this.accountName = accountName; + return this; + } + + public KuraMqttProfile brokerUrl(final String brokerUrl) { + this.brokerUrl = brokerUrl; + return this; + } + + public KuraMqttProfile customizer(Consumer customizer) { + this.customizer = customizer; + return this; + } + + public KuraMqttProfile clientId(final String clientId) { + this.clientId = clientId; + return this; + } + + public KuraMqttProfile credentials(final UserAndPassword userAndPassword) { + this.userAndPassword = userAndPassword; + return this; + } + + public Client build() throws Exception { + validate(); + + B builder = builderSupplier.get() + .clientId(this.clientId) + .broker(this.brokerUrl) + .credentials(this.userAndPassword) + .codec(new KuraBinaryPayloadCodec.Builder().build()) + .namespace( + new KuraNamespace.Builder() + .accountName(this.accountName) + .build()) + .module( + KuraBirthCertificateModule.newBuilder(this.accountName) + .defaultProviders() + .build()); + + if (customizer != null) { + customizer.accept(builder); + } + + return builder.build(); + } + + private void validate() { + hasString("accountName", this.accountName); + hasString("brokerUrl", this.brokerUrl); + hasString("clientId", this.clientId); + } + + private static void hasString(final String name, final String value) { + if (value == null || value.isEmpty()) { + throw new IllegalStateException(String.format("'%s' must be set", name)); + } + } +} diff --git a/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/profile/kura/package-info.java b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/profile/kura/package-info.java new file mode 100644 index 00000000000..d871bfd61e1 --- /dev/null +++ b/client/gateway/profile/kura/src/main/java/org/eclipse/kapua/client/gateway/profile/kura/package-info.java @@ -0,0 +1,4 @@ +/** + * An Eclipse Kura™ based connection profile + */ +package org.eclipse.kapua.client.gateway.profile.kura; \ No newline at end of file diff --git a/client/gateway/profile/kura/src/main/protobuf/kurapayload.proto b/client/gateway/profile/kura/src/main/protobuf/kurapayload.proto new file mode 100644 index 00000000000..1e1c340be82 --- /dev/null +++ b/client/gateway/profile/kura/src/main/protobuf/kurapayload.proto @@ -0,0 +1,57 @@ +/* + * This file originates from the Eclipse Kura™ project + */ + +// +// To compile: +// protoc --proto_path=src/main/protobuf --java_out=src/main/java src/main/protobuf/kurapayload.proto +// +package kuradatatypes; + +option java_package = "org.eclipse.kapua.client.client.kura.payload"; +option java_outer_classname = "KuraPayloadProto"; + +message KuraPayload { + + message KuraMetric { + enum ValueType { + DOUBLE = 0; + FLOAT = 1; + INT64 = 2; + INT32 = 3; + BOOL = 4; + STRING = 5; + BYTES = 6; + } + + required string name = 1; + required ValueType type = 2; + + optional double double_value = 3; + optional float float_value = 4; + optional int64 long_value = 5; + optional int32 int_value = 6; + optional bool bool_value = 7; + optional string string_value = 8; + optional bytes bytes_value = 9; + } + + message KuraPosition { + required double latitude = 1; + required double longitude = 2; + optional double altitude = 3; + optional double precision = 4; // dilution of precision of the current satellite fix. + optional double heading = 5; // heading in degrees + optional double speed = 6; // meters per second + optional int64 timestamp = 7; + optional int32 satellites = 8; // number satellites locked by the GPS device + optional int32 status = 9; // status indicator for the GPS data: 1 = no GPS response; 2 = error in response; 4 = valid. + } + + optional int64 timestamp = 1; + optional KuraPosition position = 2; + + extensions 3 to 4999; + repeated KuraMetric metric = 5000; // can be zero, so optional + optional bytes body = 5001; +} diff --git a/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/KuraExample.java b/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/KuraExample.java new file mode 100644 index 00000000000..9d0fdbc7bc0 --- /dev/null +++ b/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/KuraExample.java @@ -0,0 +1,99 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway; + +import static org.eclipse.kapua.client.gateway.Credentials.userAndPassword; +import static org.eclipse.kapua.client.gateway.Errors.handle; +import static org.eclipse.kapua.client.gateway.Errors.ignore; +import static org.eclipse.kapua.client.gateway.Transport.waitForConnection; + +import org.eclipse.kapua.client.gateway.Application; +import org.eclipse.kapua.client.gateway.Client; +import org.eclipse.kapua.client.gateway.Payload; +import org.eclipse.kapua.client.gateway.Sender; +import org.eclipse.kapua.client.gateway.Topic; +import org.eclipse.kapua.client.gateway.mqtt.fuse.FuseClient; +import org.eclipse.kapua.client.gateway.profile.kura.KuraMqttProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class KuraExample { + + private static final Logger logger = LoggerFactory.getLogger(KuraExample.class); + + private KuraExample() { + } + + public static void main(final String[] args) throws Exception { + + try (final Client client = KuraMqttProfile.newProfile(FuseClient.Builder::new) + .accountName("kapua-sys") + .clientId("foo-bar-1") + .brokerUrl("tcp://localhost:1883") + .credentials(userAndPassword("kapua-broker", "kapua-password")) + .build()) { + + try (final Application application = client.buildApplication("app1").build()) { + + // wait for connection + + waitForConnection(application.transport()); + + // subscribe to a topic + + application.data(Topic.of("my", "topic")).subscribe(message -> { + System.out.format("Received: %s%n", message); + }); + + // example payload + + final Payload.Builder payload = new Payload.Builder(); + payload.put("foo", "bar"); + payload.put("a", 1); + + try { + // send, handling error ourself + application.data(Topic.of("my", "topic")).send(payload); + } catch (final Exception e) { + logger.info("Failed to publish", e); + } + + // send, with attached error handler + + application.data(Topic.of("my", "topic")) + .errors(handle((e, message) -> System.err.println("Failed to publish: " + e.getMessage()))) + .send(payload); + + // ignoring error + + application.data(Topic.of("my", "topic")).errors(ignore()).send(payload); + + // cache sender instance + + final Sender sender = application.data(Topic.of("my", "topic")).errors(ignore()); + + int i = 0; + while (i < 10) { + // send + sender.send(Payload.of("counter", i++)); + Thread.sleep(1_000); + } + + // sleep to not run into Paho thread starvation + // Thread.sleep(100_000); + } + + Thread.sleep(1_000); + + } + } +} diff --git a/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/kura/NamespaceTest.java b/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/kura/NamespaceTest.java new file mode 100644 index 00000000000..a5ef57666ae --- /dev/null +++ b/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/kura/NamespaceTest.java @@ -0,0 +1,76 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.kura; + +import static org.junit.Assert.assertEquals; + +import org.eclipse.kapua.client.gateway.Topic; +import org.eclipse.kapua.client.gateway.kura.KuraNamespace; +import org.eclipse.kapua.client.gateway.kura.KuraNamespace.Builder; +import org.junit.Assert; +import org.junit.Test; + +public class NamespaceTest { + + @Test(expected = IllegalArgumentException.class) + public void testNullAccount() { + new KuraNamespace.Builder().build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyAccount() { + new KuraNamespace.Builder().accountName("").build(); + } + + @Test + public void testSetAndGetAccount() { + final Builder builder = new KuraNamespace.Builder().accountName("foo"); + assertEquals("foo", builder.accountName()); + } + + @Test + public void testSegment1() { + final KuraNamespace namespace = new KuraNamespace.Builder() + .accountName("account") + .build(); + assertEquals("account/c1/a1/seg1", namespace.dataTopic("c1", "a1", Topic.of("seg1"))); + } + + @Test + public void testSegment2() { + final KuraNamespace namespace = new KuraNamespace.Builder() + .accountName("account") + .build(); + Assert.assertEquals("account/c1/a1/seg1/seg2", namespace.dataTopic("c1", "a1", Topic.of("seg1", "seg2"))); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidAccountId1() { + new KuraNamespace.Builder() + .accountName("account/1") + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidAccountId2() { + new KuraNamespace.Builder() + .accountName("#") + .build(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidAccountId3() { + new KuraNamespace.Builder() + .accountName("+") + .build(); + } +} diff --git a/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/kura/PayloadCodecTest.java b/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/kura/PayloadCodecTest.java new file mode 100644 index 00000000000..1345a737a6b --- /dev/null +++ b/client/gateway/profile/kura/src/test/java/org/eclipse/kapua/client/gateway/kura/PayloadCodecTest.java @@ -0,0 +1,94 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.kura; + +import static java.util.Collections.singletonMap; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.Collections; + +import org.eclipse.kapua.client.gateway.Payload; +import org.eclipse.kapua.client.gateway.kura.KuraBinaryPayloadCodec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class PayloadCodecTest { + + private KuraBinaryPayloadCodec codec; + + @Before + public void setup() { + final KuraBinaryPayloadCodec.Builder builder = new KuraBinaryPayloadCodec.Builder(); + codec = builder.build(); + } + + @Test(expected = NullPointerException.class) + public void testEncodeNull1() throws Exception { + codec.encode(null, null); + } + + @Test + public void testEncodeEmpty1() throws Exception { + final ByteBuffer result = codec.encode(Payload.of(Collections.emptyMap()), null); + assertNotNull(result); + } + + @Test(expected = NullPointerException.class) + public void testDecodeNull1() throws Exception { + codec.decode(null); + } + + @Test + public void testDecode1() throws Exception { + final ByteBuffer buffer = codec.encode(Payload.of(Collections.emptyMap()), null); + buffer.flip(); + + final Payload payload = codec.decode(buffer); + + assertNotNull(payload); + assertTrue(payload.getValues().isEmpty()); + } + + @Test + public void testDecode2() throws Exception { + processCodecTwo(null); + } + + @Test + public void testDecode3() throws Exception { + processCodecTwo(ByteBuffer.allocate(100)); + } + + private void processCodecTwo(ByteBuffer buffer) throws Exception { + buffer = codec.encode(Payload.of("foo", 1), buffer); + buffer = codec.encode(Payload.of("bar", 2), buffer); + + buffer.flip(); + + final ByteBuffer buffer1 = buffer.slice(); + buffer1.limit(20); + final Payload payload1 = codec.decode(buffer1); + + final ByteBuffer buffer2 = buffer.slice(); + buffer2.position(20); + final Payload payload2 = codec.decode(buffer2); + + assertNotNull(payload1); + assertNotNull(payload2); + + Assert.assertEquals(singletonMap("foo", 1), payload1.getValues()); + Assert.assertEquals(singletonMap("bar", 2), payload2.getValues()); + } +} diff --git a/client/gateway/provider/fuse/about.html b/client/gateway/provider/fuse/about.html new file mode 100644 index 00000000000..832cf11556a --- /dev/null +++ b/client/gateway/provider/fuse/about.html @@ -0,0 +1,26 @@ + + + +About + + +

About This Content

+ +

June 27, 2017

+

License

+ +

The Eclipse Foundation makes available all content in this plug-in ("Content"). Unless otherwise +indicated below, the Content is provided to you under the terms and conditions of the +Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is available +at http://www.eclipse.org/legal/epl-v10.html. +For purposes of the EPL, "Program" will mean the Content.

+ +

If you did not receive this Content directly from the Eclipse Foundation, the Content is +being redistributed by another party ("Redistributor") and different terms and conditions may +apply to your use of any object code in the Content. Check the Redistributor's license that was +provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise +indicated below, the terms and conditions of the EPL still apply to any source code in the Content +and such source code may be obtained at http://www.eclipse.org.

+ + + \ No newline at end of file diff --git a/client/gateway/provider/fuse/pom.xml b/client/gateway/provider/fuse/pom.xml new file mode 100644 index 00000000000..ae590ddd326 --- /dev/null +++ b/client/gateway/provider/fuse/pom.xml @@ -0,0 +1,67 @@ + + + + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway-provider + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-provider-fuse + bundle + Eclipse Kapua :: Gateway Client :: Provider :: FUSE MQTT + An MQTT client implementation based on FUSE MQTT + + + + + org.eclipse.kapua + kapua-client-gateway-api + + + + org.eclipse.kapua + kapua-client-gateway-provider-mqtt + + + + org.eclipse.kapua + kapua-client-gateway-spi + + + + org.slf4j + slf4j-api + + + + org.fusesource.mqtt-client + mqtt-client + 1.14 + + + + + ch.qos.logback + logback-classic + + + + \ No newline at end of file diff --git a/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/FuseClient.java b/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/FuseClient.java new file mode 100644 index 00000000000..c0a045792b6 --- /dev/null +++ b/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/FuseClient.java @@ -0,0 +1,217 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt.fuse; + +import static java.util.Objects.requireNonNull; +import static org.eclipse.kapua.client.gateway.utils.Strings.nonEmptyText; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.eclipse.kapua.client.gateway.BinaryPayloadCodec; +import org.eclipse.kapua.client.gateway.Credentials.UserAndPassword; +import org.eclipse.kapua.client.gateway.mqtt.MqttClient; +import org.eclipse.kapua.client.gateway.mqtt.MqttMessageHandler; +import org.eclipse.kapua.client.gateway.mqtt.MqttNamespace; +import org.eclipse.kapua.client.gateway.mqtt.fuse.internal.Callbacks; +import org.eclipse.kapua.client.gateway.Module; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.UTF8Buffer; +import org.fusesource.mqtt.client.Callback; +import org.fusesource.mqtt.client.CallbackConnection; +import org.fusesource.mqtt.client.ExtendedListener; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Promise; +import org.fusesource.mqtt.client.QoS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FuseClient extends MqttClient { + + private static final Logger logger = LoggerFactory.getLogger(FuseClient.class); + + public static class Builder extends MqttClient.Builder { + + @Override + protected Builder builder() { + return this; + } + + @Override + public FuseClient build() throws Exception { + + final URI broker = requireNonNull(broker(), "Broker must be set"); + final String clientId = nonEmptyText(clientId(), "clientId"); + + final MqttNamespace namespace = requireNonNull(namespace(), "Namespace must be set"); + final BinaryPayloadCodec codec = requireNonNull(codec(), "Codec must be set"); + + final MQTT mqtt = new MQTT(); + mqtt.setCleanSession(false); + mqtt.setHost(broker); + mqtt.setClientId(clientId); + + final Object credentials = credentials(); + if (credentials == null) { + // none + } else if (credentials instanceof UserAndPassword) { + final UserAndPassword userAndPassword = (UserAndPassword) credentials; + mqtt.setUserName(userAndPassword.getUsername()); + mqtt.setPassword(userAndPassword.getPasswordAsString()); + } else { + throw new IllegalStateException( + String.format("Unknown credentials type: %s", credentials.getClass().getName())); + } + + CallbackConnection connection = mqtt.callbackConnection(); + ScheduledExecutorService executor = createExecutor(clientId); + try { + final FuseClient result = new FuseClient(modules(), clientId, executor, namespace, codec, connection); + connection = null; + executor = null; + return result; + } finally { + if (executor != null) { + executor.shutdown(); + } + } + } + } + + private static ScheduledExecutorService createExecutor(final String clientId) { + return Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, clientId)); + } + + private final ExtendedListener listener = new ExtendedListener() { + + @Override + public void onPublish(final UTF8Buffer topic, final Buffer body, final Runnable ack) { + onPublish(topic, body, new Callback>() { + + @Override + public void onSuccess(Callback value) { + ack.run(); + } + + @Override + public void onFailure(Throwable value) { + } + + }); + } + + @Override + public void onFailure(Throwable value) { + } + + @Override + public void onDisconnected() { + handleDisconnected(); + } + + @Override + public void onConnected() { + handleConnected(); + } + + @Override + public void onPublish(final UTF8Buffer topic, final Buffer body, final Callback> ack) { + handleMessageArrived(topic.toString(), body, ack); + } + }; + + private final CallbackConnection connection; + + private final Map subscriptions = new HashMap<>(); + + private FuseClient(final Set modules, final String clientId, final ScheduledExecutorService executor, + final MqttNamespace namespace, final BinaryPayloadCodec codec, final CallbackConnection connection) { + + super(executor, codec, namespace, clientId, modules); + + this.connection = connection; + + connection.listener(listener); + connection.connect(new Promise<>()); + } + + @Override + public void close() { + connection.disconnect(null); + executor.shutdown(); + } + + @Override + public void publishMqtt(final String topic, final ByteBuffer payload) { + connection.publish(Buffer.utf8(topic), new Buffer(payload), QoS.AT_LEAST_ONCE, false, null); + } + + @Override + protected CompletionStage subscribeMqtt(final String topic, final MqttMessageHandler messageHandler) { + synchronized (this) { + subscriptions.put(topic, messageHandler); + + final CompletableFuture future = new CompletableFuture<>(); + connection.subscribe( + new org.fusesource.mqtt.client.Topic[] { + new org.fusesource.mqtt.client.Topic(topic, QoS.AT_LEAST_ONCE) }, + Callbacks.asCallback(future)); + + return future; + } + } + + @Override + protected void unsubscribeMqtt(final Set mqttTopics) { + + logger.info("Unsubscribe from: {}", mqttTopics); + + final List topics = new ArrayList<>(mqttTopics.size()); + + synchronized (this) { + for (final String topic : mqttTopics) { + if (subscriptions.remove(topic) != null) { + topics.add(new UTF8Buffer(topic)); + } + } + } + + connection.unsubscribe(topics.toArray(new UTF8Buffer[topics.size()]), new Promise<>()); + } + + protected void handleMessageArrived(final String topic, final Buffer payload, final Callback> ack) { + final MqttMessageHandler handler; + + synchronized (this) { + handler = subscriptions.get(topic); + } + + if (handler != null) { + try { + handler.handleMessage(topic, payload.toByteBuffer()); + ack.onSuccess(null); + } catch (Exception e) { + ack.onFailure(e); + } + } + } + +} diff --git a/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/internal/Callbacks.java b/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/internal/Callbacks.java new file mode 100644 index 00000000000..2dc4fed9d37 --- /dev/null +++ b/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/internal/Callbacks.java @@ -0,0 +1,37 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt.fuse.internal; + +import java.util.concurrent.CompletableFuture; + +import org.fusesource.mqtt.client.Callback; + +public final class Callbacks { + + private Callbacks() { + } + + public static Callback asCallback(CompletableFuture future) { + return new Callback() { + + @Override + public void onSuccess(T value) { + future.complete(value); + } + + @Override + public void onFailure(Throwable value) { + future.completeExceptionally(value); + } + }; + } +} diff --git a/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/package-info.java b/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/package-info.java new file mode 100644 index 00000000000..d5c50e74525 --- /dev/null +++ b/client/gateway/provider/fuse/src/main/java/org/eclipse/kapua/client/gateway/mqtt/fuse/package-info.java @@ -0,0 +1,4 @@ +/** + * An MQTT based client using FUSE MQTT + */ +package org.eclipse.kapua.client.gateway.mqtt.fuse; \ No newline at end of file diff --git a/client/gateway/provider/mqtt/about.html b/client/gateway/provider/mqtt/about.html new file mode 100644 index 00000000000..832cf11556a --- /dev/null +++ b/client/gateway/provider/mqtt/about.html @@ -0,0 +1,26 @@ + + + +About + + +

About This Content

+ +

June 27, 2017

+

License

+ +

The Eclipse Foundation makes available all content in this plug-in ("Content"). Unless otherwise +indicated below, the Content is provided to you under the terms and conditions of the +Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is available +at http://www.eclipse.org/legal/epl-v10.html. +For purposes of the EPL, "Program" will mean the Content.

+ +

If you did not receive this Content directly from the Eclipse Foundation, the Content is +being redistributed by another party ("Redistributor") and different terms and conditions may +apply to your use of any object code in the Content. Check the Redistributor's license that was +provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise +indicated below, the terms and conditions of the EPL still apply to any source code in the Content +and such source code may be obtained at http://www.eclipse.org.

+ + + \ No newline at end of file diff --git a/client/gateway/provider/mqtt/pom.xml b/client/gateway/provider/mqtt/pom.xml new file mode 100644 index 00000000000..b565a5c0cfd --- /dev/null +++ b/client/gateway/provider/mqtt/pom.xml @@ -0,0 +1,57 @@ + + + + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway-provider + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-provider-mqtt + bundle + Eclipse Kapua :: Gateway Client :: Provider :: Core MQTT + Abstract base classes for implementing MQTT based clients + + + + + org.eclipse.kapua + kapua-client-gateway-api + + + + org.eclipse.kapua + kapua-client-gateway-spi + + + + org.slf4j + slf4j-api + + + + + + ch.qos.logback + logback-classic + + + + \ No newline at end of file diff --git a/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttApplication.java b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttApplication.java new file mode 100644 index 00000000000..905a37ff9b1 --- /dev/null +++ b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttApplication.java @@ -0,0 +1,76 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; + +import org.eclipse.kapua.client.gateway.ErrorHandler; +import org.eclipse.kapua.client.gateway.MessageHandler; +import org.eclipse.kapua.client.gateway.Payload; +import org.eclipse.kapua.client.gateway.Topic; +import org.eclipse.kapua.client.gateway.spi.AbstractApplication; +import org.eclipse.kapua.client.gateway.spi.AbstractData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttApplication extends AbstractApplication { + + private static final Logger logger = LoggerFactory.getLogger(MqttApplication.class); + + private final MqttClient client; + + public MqttApplication(final MqttClient client, final String applicationId, final Executor executor) { + super(client, applicationId, executor); + this.client = client; + } + + @Override + public AbstractData data(final Topic topic) { + return new AbstractData(this, topic); + } + + @Override + protected void publish(Topic topic, Payload payload) throws Exception { + logger.debug("Publishing values - {} -> {}", topic, payload.getValues()); + + final ByteBuffer buffer = client.getCodec().encode(payload, null); + buffer.flip(); + + client.publish(applicationId, topic, buffer); + } + + @Override + protected CompletionStage internalSubscribe(Topic topic, MessageHandler handler, ErrorHandler errorHandler) throws Exception { + return client.subscribe(applicationId, topic, (messageTopic, payload) -> { + logger.debug("Received message for: {}", topic); + try { + MqttApplication.this.handleMessage(handler, payload); + } catch (final Exception e) { + try { + errorHandler.handleError(e, null); + } catch (final Exception e1) { + throw e1; + } catch (final Throwable e1) { + throw new Exception(e1); + } + } + }); + } + + protected void handleMessage(final MessageHandler handler, final ByteBuffer buffer) throws Exception { + final Payload payload = client.getCodec().decode(buffer); + logger.debug("Received: {}", payload); + handler.handleMessage(payload); + } +} \ No newline at end of file diff --git a/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttClient.java b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttClient.java new file mode 100644 index 00000000000..ae77daf16fd --- /dev/null +++ b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttClient.java @@ -0,0 +1,141 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import org.eclipse.kapua.client.gateway.Application; +import org.eclipse.kapua.client.gateway.BinaryPayloadCodec; +import org.eclipse.kapua.client.gateway.Credentials.UserAndPassword; +import org.eclipse.kapua.client.gateway.spi.AbstractApplication; +import org.eclipse.kapua.client.gateway.spi.AbstractClient; +import org.eclipse.kapua.client.gateway.Module; +import org.eclipse.kapua.client.gateway.Topic; + +public abstract class MqttClient extends AbstractClient { + + public abstract static class Builder> extends AbstractClient.Builder { + + private MqttNamespace namespace; + private BinaryPayloadCodec codec; + private UserAndPassword userAndPassword; + private String clientId; + private URI broker; + + public T codec(final BinaryPayloadCodec codec) { + this.codec = codec; + return builder(); + } + + public BinaryPayloadCodec codec() { + return this.codec; + } + + public T namespace(final MqttNamespace namespace) { + this.namespace = namespace; + return builder(); + } + + public MqttNamespace namespace() { + return this.namespace; + } + + public T clientId(final String clientId) { + this.clientId = clientId; + return builder(); + } + + public String clientId() { + return this.clientId; + } + + public T credentials(final UserAndPassword userAndPassword) { + this.userAndPassword = userAndPassword; + return builder(); + } + + public T broker(final String broker) throws URISyntaxException { + Objects.requireNonNull(broker); + this.broker = new URI(broker); + return builder(); + } + + public T broker(final URI broker) throws URISyntaxException { + Objects.requireNonNull(broker); + this.broker = broker; + return builder(); + } + + public URI broker() { + return this.broker; + } + + public Object credentials() { + return this.userAndPassword; + } + } + + private final String clientId; + private final BinaryPayloadCodec codec; + private final MqttNamespace namespace; + + public MqttClient(final ScheduledExecutorService executor, final BinaryPayloadCodec codec, final MqttNamespace namespace, final String clientId, final Set modules) { + super(executor, modules); + this.clientId = clientId; + this.codec = codec; + this.namespace = namespace; + } + + protected void publish(String applicationId, Topic topic, ByteBuffer buffer) throws Exception { + final String mqttTopic = namespace.dataTopic(clientId, applicationId, topic); + publishMqtt(mqttTopic, buffer); + } + + public abstract void publishMqtt(String topic, ByteBuffer payload) throws Exception; + + protected abstract CompletionStage subscribeMqtt(String topic, MqttMessageHandler messageHandler) throws Exception; + + protected CompletionStage subscribe(final String applicationId, final Topic topic, final MqttMessageHandler messageHandler) throws Exception { + final String mqttTopic = namespace.dataTopic(clientId, applicationId, topic); + return subscribeMqtt(mqttTopic, messageHandler); + } + + @Override + protected void internalUnsubscribe(final String applicationId, final Collection topics) throws Exception { + Set mqttTopics = topics.stream().map(topic -> namespace.dataTopic(clientId, applicationId, topic)).collect(Collectors.toSet()); + unsubscribeMqtt(mqttTopics); + } + + protected abstract void unsubscribeMqtt(Set mqttTopics) throws Exception; + + public String getMqttClientId() { + return clientId; + } + + @Override + protected AbstractApplication internalCreateApplication(final Application.Builder builder, final String applicationId) { + return new MqttApplication(this, applicationId, executor); + } + + protected BinaryPayloadCodec getCodec() { + return codec; + } + +} diff --git a/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttMessageHandler.java b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttMessageHandler.java new file mode 100644 index 00000000000..c06992bbb09 --- /dev/null +++ b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttMessageHandler.java @@ -0,0 +1,20 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt; + +import java.nio.ByteBuffer; + +@FunctionalInterface +public interface MqttMessageHandler { + + public void handleMessage(String topic, ByteBuffer payload) throws Exception; +} diff --git a/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttNamespace.java b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttNamespace.java new file mode 100644 index 00000000000..b009a6b9859 --- /dev/null +++ b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/MqttNamespace.java @@ -0,0 +1,33 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt; + +import org.eclipse.kapua.client.gateway.Topic; + +/** + * A namespace implementation for MQTT + */ +public interface MqttNamespace { + + /** + * Render an MQTT topic for the provided data topic + * + * @param clientId + * The MQTT client ID + * @param applicationId + * The application ID + * @param topic + * The data topid + * @return The topic or {@code null} if no topic could be rendered for the provided input parameters + */ + public String dataTopic(String clientId, String applicationId, Topic topic); +} diff --git a/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/package-info.java b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/package-info.java new file mode 100644 index 00000000000..5050bec92bd --- /dev/null +++ b/client/gateway/provider/mqtt/src/main/java/org/eclipse/kapua/client/gateway/mqtt/package-info.java @@ -0,0 +1,4 @@ +/** + * Abstract base classes for implementing MQTT based clients + */ +package org.eclipse.kapua.client.gateway.mqtt; \ No newline at end of file diff --git a/client/gateway/provider/paho/about.html b/client/gateway/provider/paho/about.html new file mode 100644 index 00000000000..832cf11556a --- /dev/null +++ b/client/gateway/provider/paho/about.html @@ -0,0 +1,26 @@ + + + +About + + +

About This Content

+ +

June 27, 2017

+

License

+ +

The Eclipse Foundation makes available all content in this plug-in ("Content"). Unless otherwise +indicated below, the Content is provided to you under the terms and conditions of the +Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is available +at http://www.eclipse.org/legal/epl-v10.html. +For purposes of the EPL, "Program" will mean the Content.

+ +

If you did not receive this Content directly from the Eclipse Foundation, the Content is +being redistributed by another party ("Redistributor") and different terms and conditions may +apply to your use of any object code in the Content. Check the Redistributor's license that was +provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise +indicated below, the terms and conditions of the EPL still apply to any source code in the Content +and such source code may be obtained at http://www.eclipse.org.

+ + + \ No newline at end of file diff --git a/client/gateway/provider/paho/pom.xml b/client/gateway/provider/paho/pom.xml new file mode 100644 index 00000000000..35f9b2092ae --- /dev/null +++ b/client/gateway/provider/paho/pom.xml @@ -0,0 +1,71 @@ + + + + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway-provider + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-provider-paho + bundle + Eclipse Kapua :: Gateway Client :: Provider :: Eclipse Paho + An MQTT client implementation based on Eclipse Paho + + + + + org.eclipse.kapua + kapua-client-gateway-api + + + + org.eclipse.kapua + kapua-client-gateway-provider-mqtt + + + + org.eclipse.kapua + kapua-client-gateway-spi + + + + org.slf4j + slf4j-api + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + 1.0.2 + + + + + ch.qos.logback + logback-classic + + + + \ No newline at end of file diff --git a/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/PahoClient.java b/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/PahoClient.java new file mode 100644 index 00000000000..ce317b0934a --- /dev/null +++ b/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/PahoClient.java @@ -0,0 +1,297 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt.paho; + +import static java.util.Objects.requireNonNull; +import static org.eclipse.kapua.client.gateway.utils.Strings.nonEmptyText; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.eclipse.kapua.client.gateway.BinaryPayloadCodec; +import org.eclipse.kapua.client.gateway.Credentials.UserAndPassword; +import org.eclipse.kapua.client.gateway.mqtt.MqttClient; +import org.eclipse.kapua.client.gateway.mqtt.MqttMessageHandler; +import org.eclipse.kapua.client.gateway.mqtt.MqttNamespace; +import org.eclipse.kapua.client.gateway.mqtt.paho.internal.Listeners; +import org.eclipse.kapua.client.gateway.utils.Buffers; +import org.eclipse.kapua.client.gateway.Module; +import org.eclipse.paho.client.mqttv3.IMqttActionListener; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PahoClient extends MqttClient { + + private static final Logger logger = LoggerFactory.getLogger(PahoClient.class); + + public static class Builder extends MqttClient.Builder { + + private Supplier persistenceProvider = MemoryPersistence::new; + + @Override + protected Builder builder() { + return this; + } + + public Builder persistentProvider(final Supplier provider) { + if (provider != null) { + persistenceProvider = provider; + } else { + persistenceProvider = MemoryPersistence::new; + } + return builder(); + } + + public Supplier persistentProvider() { + return persistenceProvider; + } + + @Override + public PahoClient build() throws Exception { + + final URI broker = requireNonNull(broker(), "Broker must be set"); + final String clientId = nonEmptyText(clientId(), "clientId"); + + final MqttClientPersistence persistence = requireNonNull(persistenceProvider.get(), "Persistence provider returned 'null' persistence"); + final MqttNamespace namespace = requireNonNull(namespace(), "Namespace must be set"); + final BinaryPayloadCodec codec = requireNonNull(codec(), "Codec must be set"); + + MqttAsyncClient client = new MqttAsyncClient(broker.toString(), clientId, persistence); + ScheduledExecutorService executor = createExecutor(clientId); + try { + final PahoClient result = new PahoClient(modules(), clientId, executor, namespace, codec, client, persistence, createConnectOptions(this)); + client = null; + executor = null; + return result; + } finally { + if (executor != null) { + executor.shutdown(); + } + if (client != null) { + try { + client.disconnectForcibly(0); + } finally { + client.close(); + } + } + } + } + } + + private static ScheduledExecutorService createExecutor(final String clientId) { + return Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, clientId)); + } + + private static MqttConnectOptions createConnectOptions(final Builder builder) { + final MqttConnectOptions result = new MqttConnectOptions(); + + final Object credentials = builder.credentials(); + if (credentials instanceof UserAndPassword) { + final UserAndPassword userAndPassword = (UserAndPassword) credentials; + result.setUserName(userAndPassword.getUsername()); + result.setPassword(userAndPassword.getPassword()); + } else if (credentials == null) { + // ignore + } else { + throw new IllegalArgumentException(String.format("Unsupported credentials type: %s", credentials.getClass().getName())); + } + + return result; + } + + private final MqttConnectOptions connectOptions; + private MqttAsyncClient client; + + private final Map subscriptions = new HashMap<>(); + + private PahoClient(final Set modules, final String clientId, final ScheduledExecutorService executor, final MqttNamespace namespace, final BinaryPayloadCodec codec, + final MqttAsyncClient client, final MqttClientPersistence persistence, final MqttConnectOptions connectOptions) { + + super(executor, codec, namespace, clientId, modules); + + this.connectOptions = connectOptions; + this.client = client; + + this.client.setCallback(new MqttCallback() { + + @Override + public void messageArrived(final String topic, final MqttMessage message) throws Exception { + handleMessageArrived(topic, message); + } + + @Override + public void deliveryComplete(final IMqttDeliveryToken token) { + } + + @Override + public void connectionLost(final Throwable cause) { + handleDisconnected(); + } + }); + + this.executor.execute(this::connect); + } + + protected void connect() { + try { + client.connect(connectOptions, null, new IMqttActionListener() { + + @Override + public void onSuccess(final IMqttToken asyncActionToken) { + handleConnected(); + } + + @Override + public void onFailure(final IMqttToken asyncActionToken, final Throwable exception) { + handleDisconnected(); + } + }); + } catch (final MqttException e) { + logger.warn("Failed to call connect", e); + } + } + + @Override + public void close() { + + final MqttAsyncClient client; + + synchronized (this) { + client = this.client; + if (client == null) { + return; + } + this.client = null; + } + + try { + // disconnect first + + try { + client.disconnect().waitForCompletion(); + } catch (final MqttException e) { + } + + // now try to close (and free the resources) + + try { + client.close(); + } catch (final MqttException e) { + } + } finally { + executor.shutdown(); + } + } + + @Override + protected void handleConnected() { + synchronized (this) { + super.handleConnected(); + handleResubscribe(); + } + } + + private void handleResubscribe() { + for (final Map.Entry entry : subscriptions.entrySet()) { + try { + internalSubscribe(entry.getKey()); + } catch (final MqttException e) { + logger.warn("Failed to re-subscribe to '{}'", entry.getKey()); + } + } + } + + @Override + protected void handleDisconnected() { + synchronized (this) { + try { + super.handleDisconnected(); + } finally { + executor.schedule(this::connect, 1, TimeUnit.SECONDS); + } + } + } + + @Override + public void publishMqtt(final String topic, final ByteBuffer payload) throws Exception { + publish(topic, payload); + } + + protected void publish(final String topic, final ByteBuffer payload) throws MqttException { + logger.debug("Publishing {} - {}", topic, payload); + client.publish(topic, Buffers.toByteArray(payload), 1, false); + } + + @Override + protected CompletionStage subscribeMqtt(String topic, MqttMessageHandler messageHandler) throws MqttException { + synchronized (this) { + subscriptions.put(topic, messageHandler); + return internalSubscribe(topic); + } + } + + @Override + protected void unsubscribeMqtt(final Set mqttTopics) throws MqttException { + logger.info("Unsubscribe from: {}", mqttTopics); + + final List topics = new ArrayList<>(mqttTopics.size()); + + synchronized (this) { + for (String topic : mqttTopics) { + if (subscriptions.remove(topic) != null) { + topics.add(topic); + } + } + } + + client.unsubscribe(topics.toArray(new String[topics.size()])); + } + + protected void handleMessageArrived(final String topic, final MqttMessage message) throws Exception { + final ByteBuffer buffer = Buffers.wrap(message.getPayload()); + buffer.flip(); + + logger.debug("Received message - mqtt-topic: {}, payload: {}", topic, buffer); + + final MqttMessageHandler handler = subscriptions.get(topic); + if (handler != null) { + handler.handleMessage(topic, buffer); + } + } + + private CompletionStage internalSubscribe(final String topic) throws MqttException { + final CompletableFuture future = new CompletableFuture<>(); + client.subscribe(topic, 1, null, Listeners.toListener(future)); + return future; + } + +} diff --git a/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/internal/Listeners.java b/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/internal/Listeners.java new file mode 100644 index 00000000000..42e4e8e568d --- /dev/null +++ b/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/internal/Listeners.java @@ -0,0 +1,38 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.mqtt.paho.internal; + +import java.util.concurrent.CompletableFuture; + +import org.eclipse.paho.client.mqttv3.IMqttActionListener; +import org.eclipse.paho.client.mqttv3.IMqttToken; + +public final class Listeners { + + private Listeners() { + } + + public static IMqttActionListener toListener(final CompletableFuture token) { + return new IMqttActionListener() { + + @Override + public void onSuccess(final IMqttToken asyncActionToken) { + token.complete(null); + } + + @Override + public void onFailure(final IMqttToken asyncActionToken, final Throwable exception) { + token.completeExceptionally(exception); + } + }; + } +} diff --git a/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/package-info.java b/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/package-info.java new file mode 100644 index 00000000000..5fe8dc978da --- /dev/null +++ b/client/gateway/provider/paho/src/main/java/org/eclipse/kapua/client/gateway/mqtt/paho/package-info.java @@ -0,0 +1,4 @@ +/** + * An MQTT based client using Eclipse Paho™ + */ +package org.eclipse.kapua.client.gateway.mqtt.paho; \ No newline at end of file diff --git a/client/gateway/provider/pom.xml b/client/gateway/provider/pom.xml new file mode 100644 index 00000000000..9b3b990d94a --- /dev/null +++ b/client/gateway/provider/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-provider + pom + + + fuse + mqtt + paho + + + diff --git a/client/gateway/spi/about.html b/client/gateway/spi/about.html new file mode 100644 index 00000000000..832cf11556a --- /dev/null +++ b/client/gateway/spi/about.html @@ -0,0 +1,26 @@ + + + +About + + +

About This Content

+ +

June 27, 2017

+

License

+ +

The Eclipse Foundation makes available all content in this plug-in ("Content"). Unless otherwise +indicated below, the Content is provided to you under the terms and conditions of the +Eclipse Public License Version 1.0 ("EPL"). A copy of the EPL is available +at http://www.eclipse.org/legal/epl-v10.html. +For purposes of the EPL, "Program" will mean the Content.

+ +

If you did not receive this Content directly from the Eclipse Foundation, the Content is +being redistributed by another party ("Redistributor") and different terms and conditions may +apply to your use of any object code in the Content. Check the Redistributor's license that was +provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise +indicated below, the terms and conditions of the EPL still apply to any source code in the Content +and such source code may be obtained at http://www.eclipse.org.

+ + + \ No newline at end of file diff --git a/client/gateway/spi/pom.xml b/client/gateway/spi/pom.xml new file mode 100644 index 00000000000..f9ee232f87c --- /dev/null +++ b/client/gateway/spi/pom.xml @@ -0,0 +1,54 @@ + + + + + + + 4.0.0 + + + org.eclipse.kapua + kapua-client-gateway + 0.2.0-SNAPSHOT + .. + + + kapua-client-gateway-spi + bundle + Eclipse Kapua :: Gateway Client :: SPI + + + + + org.eclipse.kapua + kapua-client-gateway-api + + + + org.slf4j + slf4j-api + + + + + ch.qos.logback + logback-classic + + + junit + junit + + + + diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractApplication.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractApplication.java new file mode 100644 index 00000000000..28baa77aaf9 --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractApplication.java @@ -0,0 +1,94 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.spi; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; + +import org.eclipse.kapua.client.gateway.Application; +import org.eclipse.kapua.client.gateway.ErrorHandler; +import org.eclipse.kapua.client.gateway.MessageHandler; +import org.eclipse.kapua.client.gateway.Payload; +import org.eclipse.kapua.client.gateway.Topic; +import org.eclipse.kapua.client.gateway.Transport; +import org.eclipse.kapua.client.gateway.utils.TransportAsync; + +public abstract class AbstractApplication implements Application { + + private final AbstractClient client; + protected final Set subscriptions = new HashSet<>(); + protected final String applicationId; + protected final TransportAsync transport; + private boolean closed; + + public AbstractApplication(final AbstractClient client, final String applicationId, final Executor executor) { + this.client = client; + this.applicationId = applicationId; + transport = new TransportAsync(executor); + } + + protected synchronized void handleConnected() { + if (closed) { + return; + } + transport.handleConnected(); + } + + protected synchronized void handleDisconnected() { + if (closed) { + return; + } + transport.handleDisconnected(); + } + + protected void checkClosed() { + if (closed) { + throw new IllegalStateException("Application is closed"); + } + } + + @Override + public synchronized Transport transport() { + checkClosed(); + return transport; + } + + @Override + public abstract AbstractData data(Topic topic); + + @Override + public void close() throws Exception { + synchronized (this) { + if (closed) { + return; + } + closed = true; + } + + client.internalCloseApplication(applicationId, subscriptions, this); + } + + protected abstract void publish(Topic topic, Payload payload) throws Exception; + + public CompletionStage subscribe(Topic topic, MessageHandler handler, ErrorHandler errorHandler) throws Exception { + recordSubscription(topic); + return internalSubscribe(topic, handler, errorHandler); + } + + private void recordSubscription(final Topic topic) { + subscriptions.add(topic); + } + + protected abstract CompletionStage internalSubscribe(Topic topic, MessageHandler handler, ErrorHandler errorHandler) throws Exception; +} \ No newline at end of file diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractClient.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractClient.java new file mode 100644 index 00000000000..ae163c3a19d --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractClient.java @@ -0,0 +1,173 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.spi; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; + +import org.eclipse.kapua.client.gateway.Application; +import org.eclipse.kapua.client.gateway.Client; +import org.eclipse.kapua.client.gateway.Module; +import org.eclipse.kapua.client.gateway.ModuleContext; +import org.eclipse.kapua.client.gateway.Topic; +import org.eclipse.kapua.client.gateway.Transport; +import org.eclipse.kapua.client.gateway.utils.TransportAsync; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractClient implements Client { + + private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); + + public static abstract class Builder> implements Client.Builder { + + protected abstract T builder(); + + private final Set modules = new HashSet<>(); + + public T module(final Module module) { + Objects.requireNonNull(module); + + this.modules.add(module); + return builder(); + } + + public Set modules() { + return this.modules; + } + } + + protected final ScheduledExecutorService executor; + private final Set modules; + + private final TransportAsync transport; + + private final Map applications = new HashMap<>(); + + public AbstractClient(final ScheduledExecutorService executor, final Set modules) { + this.executor = executor; + this.modules = new HashSet<>(modules); + + transport = new TransportAsync(executor); + + fireModuleEvent(module -> module.initialize(new ModuleContext() { + + @Override + public Client getClient() { + return AbstractClient.this; + } + })); + } + + @Override + public Transport transport() { + return transport; + } + + private void fireModuleEvent(final Consumer consumer) { + for (final Module module : modules) { + try { + consumer.accept(module); + } catch (final Exception e) { + logger.info("Failed to process module event", e); + } + } + } + + protected void notifyAddApplication(final String applicationId) { + fireModuleEvent(module -> module.applicationAdded(applicationId)); + } + + protected void notifyRemoveApplication(final String applicationId) { + fireModuleEvent(module -> module.applicationRemoved(applicationId)); + } + + protected void notifyConnected() { + fireModuleEvent(Module::connected); + transport.handleConnected(); + } + + protected void notifyDisconnected() { + fireModuleEvent(Module::disconnected); + transport.handleDisconnected(); + } + + protected void handleConnected() { + logger.debug("Connected"); + + notifyConnected(); + synchronized (this) { + applications.values().stream().forEach(app -> app.handleConnected()); + } + } + + protected void handleDisconnected() { + logger.debug("Disconnected"); + + notifyDisconnected(); + synchronized (this) { + applications.values().stream().forEach(app -> app.handleDisconnected()); + } + } + + @Override + public Application.Builder buildApplication(final String applicationId) { + return new Application.Builder() { + + @Override + public Application build() { + return internalBuildApplication(this, applicationId); + } + }; + } + + protected AbstractApplication internalBuildApplication(final Application.Builder builder, final String applicationId) { + synchronized (this) { + if (applications.containsKey(applicationId)) { + throw new IllegalStateException(String.format("An application with the ID '%s' already exists", applicationId)); + } + + final AbstractApplication result = internalCreateApplication(builder, applicationId); + + applications.put(applicationId, result); + notifyAddApplication(applicationId); + + return result; + } + } + + protected abstract AbstractApplication internalCreateApplication(final Application.Builder builder, final String applicationId); + + protected abstract void internalUnsubscribe(String applicationId, Collection topics) throws Exception; + + protected synchronized void internalCloseApplication(final String applicationId, Set topics, final AbstractApplication application) { + if (applications.remove(applicationId, application)) { + try { + internalUnsubscribe(applicationId, topics); + } catch (Exception e) { + logger.warn("Failed to unsubscribe on application close", e); + } + handleApplicationClosed(applicationId, application); + } + } + + protected void handleApplicationClosed(final String applicationId, final AbstractApplication application) { + notifyRemoveApplication(applicationId); + } + +} diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractData.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractData.java new file mode 100644 index 00000000000..2b83a9908a6 --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/AbstractData.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.spi; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.CompletionStage; + +import org.eclipse.kapua.client.gateway.Data; +import org.eclipse.kapua.client.gateway.ErrorHandler; +import org.eclipse.kapua.client.gateway.MessageHandler; +import org.eclipse.kapua.client.gateway.Payload; +import org.eclipse.kapua.client.gateway.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AbstractData implements Data { + + private static final Logger logger = LoggerFactory.getLogger(AbstractData.class); + + private final AbstractApplication application; + private final Topic topic; + + public AbstractData(final AbstractApplication application, final Topic topic) { + this.application = application; + this.topic = topic; + } + + @Override + public void send(final Payload payload) throws Exception { + application.publish(topic, payload); + } + + @Override + public void subscribe(final MessageHandler handler, final ErrorHandler errorHandler) throws Exception { + requireNonNull(handler); + requireNonNull(errorHandler); + + logger.debug("Setting subscription for: {}", topic); + + final CompletionStage future = application.subscribe(topic, handler, errorHandler); + future.toCompletableFuture().get(); + } +} diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/package-info.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/package-info.java new file mode 100644 index 00000000000..def4945b8cf --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/spi/package-info.java @@ -0,0 +1,4 @@ +/** + * Abstract base classes for implementing {@link org.eclipse.kapua.gateway.client.Client}s + */ +package org.eclipse.kapua.client.gateway.spi; \ No newline at end of file diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/Buffers.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/Buffers.java new file mode 100644 index 00000000000..3b225cfc068 --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/Buffers.java @@ -0,0 +1,55 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.utils; + +import java.nio.ByteBuffer; + +public final class Buffers { + + private Buffers() { + } + + /** + * Wrap bytes into a {@link ByteBuffer}, as if they would just haven been put + * + * @param data + * the data to use, may be {@code null} or empty + * @return the new {@link ByteBuffer}, may be {@code null} or empty, if the input is {@code null} or empty + */ + public static ByteBuffer wrap(final byte[] data) { + if (data == null) { + return null; + } + + final ByteBuffer buffer = ByteBuffer.wrap(data); + buffer.position(buffer.limit()); + + return buffer; + } + + /** + * Extract the remaining data as a byte array + * + * @param buffer + * the input buffer, may be {@code null} + * @return the output array, may be empty or {@code null} if the input is empty or {@code null} + */ + public static byte[] toByteArray(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + final byte[] byteArray = new byte[buffer.remaining()]; + buffer.get(byteArray); + return byteArray; + } +} diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/Strings.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/Strings.java new file mode 100644 index 00000000000..f30777ac96b --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/Strings.java @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.utils; + +public final class Strings { + + private Strings() { + } + + public static String nonEmptyText(final String string, final String fieldName) { + if (string == null || string.isEmpty()) { + throw new IllegalArgumentException(String.format("'%s' must not be null or empty", fieldName)); + } + return string; + } + +} diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/TransportAsync.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/TransportAsync.java new file mode 100644 index 00000000000..68118805da9 --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/TransportAsync.java @@ -0,0 +1,57 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.utils; + +import java.util.concurrent.Executor; +import java.util.function.Consumer; + +import org.eclipse.kapua.client.gateway.Transport; + +public class TransportAsync implements Transport { + + private final Executor executor; + private Consumer listener; + private boolean state; + + public TransportAsync(final Executor executor) { + this.executor = executor; + } + + private void fireEvent(final boolean state, final Consumer listener) { + if (listener == null) { + return; + } + executor.execute(() -> listener.accept(state)); + } + + public synchronized void handleConnected() { + if (!state) { + state = true; + fireEvent(true, listener); + } + } + + public synchronized void handleDisconnected() { + if (state) { + state = false; + fireEvent(false, listener); + } + } + + @Override + public void state(final Consumer listener) { + synchronized (this) { + this.listener = listener; + fireEvent(state, listener); + } + } +} diff --git a/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/package-info.java b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/package-info.java new file mode 100644 index 00000000000..6250d30a547 --- /dev/null +++ b/client/gateway/spi/src/main/java/org/eclipse/kapua/client/gateway/utils/package-info.java @@ -0,0 +1,4 @@ +/** + * Commons base utilities + */ +package org.eclipse.kapua.client.gateway.utils; \ No newline at end of file diff --git a/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/BuffersTest.java b/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/BuffersTest.java new file mode 100644 index 00000000000..0c4c9e83190 --- /dev/null +++ b/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/BuffersTest.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.utils; + +import java.nio.ByteBuffer; + +import org.eclipse.kapua.client.gateway.utils.Buffers; +import org.junit.Assert; +import org.junit.Test; + +public class BuffersTest { + + @Test + public void test1() { + final ByteBuffer result = Buffers.wrap(new byte[0]); + Assert.assertNotNull(result); + Assert.assertFalse(result.hasRemaining()); + } + + @Test + public void test2() { + final ByteBuffer result = Buffers.wrap(null); + Assert.assertNull(result); + } + + @Test + public void test3() { + final ByteBuffer result = Buffers.wrap(new byte[] { 12 }); + Assert.assertNotNull(result); + result.flip(); + Assert.assertEquals(1, result.remaining()); + Assert.assertEquals(12, result.get()); + } + + @Test + public void test4() { + final ByteBuffer input = ByteBuffer.allocate(1); + input.put((byte) 12); + + input.flip(); + + final byte[] result = Buffers.toByteArray(input); + Assert.assertNotNull(result); + Assert.assertEquals(1, result.length); + Assert.assertEquals(12, result[0]); + } + + @Test + public void test5() { + final byte[] result = Buffers.toByteArray(null); + Assert.assertNull(result); + } + +} diff --git a/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/StringsTest.java b/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/StringsTest.java new file mode 100644 index 00000000000..665b36b760a --- /dev/null +++ b/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/StringsTest.java @@ -0,0 +1,33 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.utils; + +import org.eclipse.kapua.client.gateway.utils.Strings; +import org.junit.Test; + +public class StringsTest { + + @Test(expected = IllegalArgumentException.class) + public void test1() { + Strings.nonEmptyText(null, "foo"); + } + + @Test(expected = IllegalArgumentException.class) + public void test2() { + Strings.nonEmptyText("", "foo"); + } + + @Test + public void test3() { + Strings.nonEmptyText("foo", "foo"); + } +} diff --git a/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/TransportAsyncTest.java b/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/TransportAsyncTest.java new file mode 100644 index 00000000000..ae9f1e53227 --- /dev/null +++ b/client/gateway/spi/src/test/java/org/eclipse/kapua/client/gateway/utils/TransportAsyncTest.java @@ -0,0 +1,63 @@ +/******************************************************************************* + * Copyright (c) 2017 Red Hat Inc and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Red Hat Inc - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.client.gateway.utils; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.eclipse.kapua.client.gateway.Transport; +import org.eclipse.kapua.client.gateway.utils.TransportAsync; +import org.junit.Assert; +import org.junit.Test; + +public class TransportAsyncTest { + + @Test + public void test1() throws InterruptedException { + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + try { + final TransportAsync transport = new TransportAsync(executor); + + final Instant start = Instant.now(); + executor.schedule(() -> { + transport.handleConnected(); + }, 100, TimeUnit.MILLISECONDS); + + Transport.waitForConnection(transport); + + Duration duration = Duration.between(start, Instant.now()); + System.out.println(duration); + + Assert.assertTrue(duration.compareTo(Duration.ofMillis(100)) > 0); + + } finally { + executor.shutdown(); + } + } + + @Test + public void test2() throws InterruptedException { + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + try { + final TransportAsync transport = new TransportAsync(executor); + transport.events(events -> { + }); + } finally { + executor.shutdown(); + } + } +} diff --git a/client/pom.xml b/client/pom.xml new file mode 100644 index 00000000000..d3a1eca2ed0 --- /dev/null +++ b/client/pom.xml @@ -0,0 +1,30 @@ + + + + 4.0.0 + + + org.eclipse.kapua + kapua + 0.2.0-SNAPSHOT + + + kapua-client + pom + + + gateway + + + diff --git a/pom.xml b/pom.xml index 38ef7d426f4..6821f0c55f1 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,11 @@ clean install + + org.apache.felix + maven-bundle-plugin + 3.3.0 + org.apache.maven.plugins maven-assembly-plugin @@ -417,6 +422,36 @@ kapua-broker-core ${project.version} + + org.eclipse.kapua + kapua-client-gateway-api + ${project.version} + + + org.eclipse.kapua + kapua-client-gateway-profile-kura + ${project.version} + + + org.eclipse.kapua + kapua-client-gateway-provider-fuse + ${project.version} + + + org.eclipse.kapua + kapua-client-gateway-provider-mqtt + ${project.version} + + + org.eclipse.kapua + kapua-client-gateway-provider-paho + ${project.version} + + + org.eclipse.kapua + kapua-client-gateway-spi + ${project.version} + org.eclipse.kapua kapua-commons