Skip to content

Commit

Permalink
Mqtt5 Listener (#590)
Browse files Browse the repository at this point in the history
* initial client listener

* set listener options parameter ids

* clang-format

* update class ids

* binding listenerPublishEvents; add mqtt5 utils

* setup client in mqtt5 listener

* set the correct booleanmethod

* update comments and rename listener variable name

* clangformat

* add mqtt5listeneroptionsBuilder, update cr

* update comments for PublishEvents

* clang-format

* fix validation order

* adding the first listener test

* Revert "adding the first listener test"

This reverts commit d049df6.
  • Loading branch information
xiazhvera committed Feb 28, 2023
1 parent b285819 commit d9a3ff0
Show file tree
Hide file tree
Showing 10 changed files with 1,695 additions and 724 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@
*/
package software.amazon.awssdk.crt.mqtt5;

import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.io.ClientBootstrap;
import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions.JitterMode;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.io.ExponentialBackoffRetryOptions.JitterMode;

import software.amazon.awssdk.crt.mqtt5.packets.ConnectPacket;

import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.function.Consumer;

/**
* Configuration for the creation of Mqtt5Clients
*
Expand Down Expand Up @@ -337,7 +336,9 @@ public interface LifecycleEvents {
*/
public interface PublishEvents {
/**
* Called when an MQTT PUBLISH packet is received by the client
* Called when an MQTT PUBLISH packet is received by the client. If the PublishReturn has been already handled by
* a service client, then the callback will not get invoked.
* Checkout Mqtt5ListenerOptions.ListenerPublishEvents for more details.
*
* @param client The client that has received the message
* @param publishReturn All of the data that was received from the server
Expand Down
73 changes: 73 additions & 0 deletions src/main/java/software/amazon/awssdk/crt/mqtt5/Mqtt5Listener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.mqtt5;

import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;


/**
* This class wraps the aws-c-mqtt MQTT5 client to provide the basic MQTT5 pub/sub functionalities
* via the AWS Common Runtime
*
* One Mqtt5Listener class creates one connection.
*
* MQTT5 support is currently in <b>developer preview</b>. We encourage feedback at all times, but feedback during the
* preview window is especially valuable in shaping the final product. During the preview period we may make
* backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid.
*/
public class Mqtt5Listener extends CrtResource {
private Mqtt5Client clientReference = null;

/**
* Creates a Mqtt5Listener instance using the provided Mqtt5ListenerOptions. Once the Mqtt5Listener is created,
* changing the settings will not cause a change in already created Mqtt5Listener's.
*
* @param options The Mqtt5ListenerOptions class to use to configure the new Mqtt5Listener.
* @param client The Mqtt5Client class the mqtt5 listener listen to
* @throws CrtRuntimeException If the system is unable to allocate space for a native MQTT5 client structure
*/
public Mqtt5Listener(Mqtt5ListenerOptions options, Mqtt5Client client) throws CrtRuntimeException {

clientReference = client;
acquireNativeHandle(mqtt5ListenerNew(
options,
client,
this
));
// addRefrenceTo client after the native handler get acquired
addReferenceTo(client);
}

/**
* Cleans up the native resources associated with this client. The client is unusable after this call
*/
@Override
protected void releaseNativeHandle() {
if (!isNull()) {
mqtt5ListenerDestroy(getNativeHandle());
removeReferenceTo(clientReference);
}
}

/**
* Determines whether a resource releases its dependencies at the same time the native handle is released or if it waits.
* Resources that wait are responsible for calling releaseReferences() manually.
*/
@Override
protected boolean canReleaseReferencesImmediately() { return false; }


/*******************************************************************************
* native methods
******************************************************************************/
private static native long mqtt5ListenerNew(
Mqtt5ListenerOptions options,
Mqtt5Client client,
Mqtt5Listener listener
) throws CrtRuntimeException;
private static native void mqtt5ListenerDestroy(long listener);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
package software.amazon.awssdk.crt.mqtt5;

import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions.LifecycleEvents;

/**
* Configuration for the creation of Mqtt5Listener
*
* MQTT5 support is currently in <b>developer preview</b>. We encourage feedback at all times, but feedback during the
* preview window is especially valuable in shaping the final product. During the preview period we may make
* backwards-incompatible changes to the public API, but in general, this is something we will try our best to avoid.
*/
public class Mqtt5ListenerOptions {

private LifecycleEvents lifecycleEvents;
private ListenerPublishEvents listenerPublishEvents;

/**
* Returns the LifecycleEvents interface that will be called when the client gets a LifecycleEvent.
*
* @return The LifecycleEvents interface that will be called when the client gets a LifecycleEvent
*/
public LifecycleEvents getLifecycleEvents() {
return this.lifecycleEvents;
}

/**
* Returns the PublishEvents interface that will be called when the client gets a message.
*
* @return PublishEvents interface that will be called when the client gets a message.
*/
public ListenerPublishEvents getListenerPublishEvents() {
return this.listenerPublishEvents;
}


public Mqtt5ListenerOptions(Mqtt5ListenerOptionsBuilder builder)
{
this.lifecycleEvents = builder.lifecycleEvents;
this.listenerPublishEvents = builder.listenerPublishEvents;
}

/*******************************************************************************
* callback methods
******************************************************************************/

/**
* An interface that defines all of the publish functions the Mqtt5Client will call when it receives a publish packet.
*/
public interface ListenerPublishEvents {
/**
* Called when an MQTT PUBLISH packet is received by the client
*
* @param client The client that has received the message
* @param publishReturn All of the data that was received from the server
*
* @return return true if the message get processed, otherwise false
* If the message get processed, it will not passed down anymore. The client
* and other listener would not get the publishEvent anymore.
*/
public boolean onMessageReceived(Mqtt5Client client, PublishReturn publishReturn);
}

/*******************************************************************************
* Builder
******************************************************************************/

/**
* All of the options for a Mqtt5Listener. This includes the settings to make a connection, as well as the
* event callbacks, publish callbacks, and more.
*/
static final public class Mqtt5ListenerOptionsBuilder {
private LifecycleEvents lifecycleEvents;
private ListenerPublishEvents listenerPublishEvents;

/**
* Sets the Lifecycle Events interface that will be called when the client gets a LifecycleEvent.
*
* @param lifecycleEvents The LifecycleEvents interface that will be called
* @return The Mqtt5ListenerOptionsBuilder after setting the Lifecycle Events interface
*/
public Mqtt5ListenerOptionsBuilder withLifecycleEvents(LifecycleEvents lifecycleEvents) {
this.lifecycleEvents = lifecycleEvents;
return this;
}


/**
* Sets the ListenerPublishEvents interface that will be called when the client gets a message.
*
* @param publishEvents The ListenerPublishEvents interface that will be called when the client gets a message.
* @return The Mqtt5ListenerOptionsBuilder after setting the PublishEvents interface
*/
public Mqtt5ListenerOptionsBuilder withListenerPublishEvents(ListenerPublishEvents publishEvents) {
this.listenerPublishEvents = publishEvents;
return this;
}


/**
* Returns a Mqtt5ListenerOptions class configured with all of the options set in the Mqtt5ListenerOptions.
* This can then be used to make a new Mqtt5Client.
*
* @return A configured Mqtt5ListenerOptions
*/
public Mqtt5ListenerOptions build()
{
return new Mqtt5ListenerOptions(this);
}

}

}


40 changes: 40 additions & 0 deletions src/native/java_class_ids.c
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,22 @@ static void s_cache_mqtt5_publish_events_properties(JNIEnv *env) {
AWS_FATAL_ASSERT(mqtt5_publish_events_properties.publish_events_publish_received_id);
}

struct java_aws_mqtt5_listener_publish_events mqtt5_listener_publish_events_properties;

static void s_cache_mqtt5_listener_publish_events_properties(JNIEnv *env) {
jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/mqtt5/Mqtt5ListenerOptions$ListenerPublishEvents");
AWS_FATAL_ASSERT(cls);
mqtt5_listener_publish_events_properties.listener_publish_events_class = (*env)->NewGlobalRef(env, cls);
AWS_FATAL_ASSERT(mqtt5_listener_publish_events_properties.listener_publish_events_class);
// Functions
mqtt5_listener_publish_events_properties.listener_publish_events_publish_received_id = (*env)->GetMethodID(
env,
mqtt5_listener_publish_events_properties.listener_publish_events_class,
"onMessageReceived",
"(Lsoftware/amazon/awssdk/crt/mqtt5/Mqtt5Client;Lsoftware/amazon/awssdk/crt/mqtt5/PublishReturn;)Z");
AWS_FATAL_ASSERT(mqtt5_listener_publish_events_properties.listener_publish_events_publish_received_id);
}

struct java_aws_mqtt5_lifecycle_events mqtt5_lifecycle_events_properties;

static void s_cache_mqtt5_lifecycle_events_properties(JNIEnv *env) {
Expand Down Expand Up @@ -2065,6 +2081,28 @@ static void s_cache_mqtt5_on_disconnection_return(JNIEnv *env) {
AWS_FATAL_ASSERT(mqtt5_on_disconnection_return_properties.return_constructor_id);
}

struct java_aws_mqtt5_listener_options_properties mqtt5_listener_options_properties;

static void s_cache_mqtt5_listener_options(JNIEnv *env) {
jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/mqtt5/Mqtt5ListenerOptions");
AWS_FATAL_ASSERT(cls);
mqtt5_listener_options_properties.listener_options_class = (*env)->NewGlobalRef(env, cls);
AWS_FATAL_ASSERT(mqtt5_listener_options_properties.listener_options_class);
// Functions
mqtt5_listener_options_properties.listener_publish_events_field_id = (*env)->GetFieldID(
env,
mqtt5_listener_options_properties.listener_options_class,
"listenerPublishEvents",
"Lsoftware/amazon/awssdk/crt/mqtt5/Mqtt5ListenerOptions$ListenerPublishEvents;");
AWS_FATAL_ASSERT(mqtt5_listener_options_properties.listener_publish_events_field_id);
mqtt5_listener_options_properties.lifecycle_events_field_id = (*env)->GetFieldID(
env,
mqtt5_listener_options_properties.listener_options_class,
"lifecycleEvents",
"Lsoftware/amazon/awssdk/crt/mqtt5/Mqtt5ClientOptions$LifecycleEvents;");
AWS_FATAL_ASSERT(mqtt5_listener_options_properties.lifecycle_events_field_id);
}

struct java_boxed_integer_properties boxed_integer_properties;

static void s_cache_boxed_integer(JNIEnv *env) {
Expand Down Expand Up @@ -2212,6 +2250,7 @@ void cache_java_class_ids(JNIEnv *env) {
s_cache_mqtt5_unsuback_reason_code(env);
s_cache_mqtt5_user_property(env);
s_cache_mqtt5_publish_events_properties(env);
s_cache_mqtt5_listener_publish_events_properties(env);
s_cache_mqtt5_lifecycle_events_properties(env);
s_cache_mqtt5_puback_result(env);
s_cache_mqtt5_publish_return(env);
Expand All @@ -2220,6 +2259,7 @@ void cache_java_class_ids(JNIEnv *env) {
s_cache_mqtt5_on_connection_success_return(env);
s_cache_mqtt5_on_connection_failure_return(env);
s_cache_mqtt5_on_disconnection_return(env);
s_cache_mqtt5_listener_options(env);
s_cache_boxed_integer(env);
s_cache_boxed_boolean(env);
s_cache_boxed_list(env);
Expand Down
16 changes: 16 additions & 0 deletions src/native/java_class_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,13 @@ struct java_aws_mqtt5_publish_events {
};
extern struct java_aws_mqtt5_publish_events mqtt5_publish_events_properties;

/* mqtt5.Mqtt5ListenerOptions.PublishEvents */
struct java_aws_mqtt5_listener_publish_events {
jclass listener_publish_events_class;
jmethodID listener_publish_events_publish_received_id;
};
extern struct java_aws_mqtt5_listener_publish_events mqtt5_listener_publish_events_properties;

/* mqtt5.Mqtt5ClientOptions.LifecycleEvents */
struct java_aws_mqtt5_lifecycle_events {
jclass lifecycle_events_class;
Expand Down Expand Up @@ -863,6 +870,15 @@ struct java_aws_mqtt5_on_disconnection_return_properties {
};
extern struct java_aws_mqtt5_on_disconnection_return_properties mqtt5_on_disconnection_return_properties;

/* mqtt5.ListenerOptions */
struct java_aws_mqtt5_listener_options_properties {
jclass listener_options_class;

jfieldID listener_publish_events_field_id;
jfieldID lifecycle_events_field_id;
};
extern struct java_aws_mqtt5_listener_options_properties mqtt5_listener_options_properties;

/* java/lang/Integer */
struct java_boxed_integer_properties {
jclass integer_class;
Expand Down
Loading

0 comments on commit d9a3ff0

Please sign in to comment.