Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ping request response interceptor #74

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7a27e87
Added Pingrequest Interceptor
RhinoBetatron Aug 12, 2019
1600de7
Cleaned up Tests for Pingrequest and Pingresponse InterceptorHandlers
RhinoBetatron Aug 12, 2019
847b8f6
Optimized Imports
RhinoBetatron Aug 13, 2019
3020c12
Adressed PR Comments
RhinoBetatron Aug 13, 2019
0b508b3
Refactoring to ChannelDuplexHandler
RhinoBetatron Aug 14, 2019
0b0fa70
Refactored Ping Request and Ping Response to be implemented by a sing…
RhinoBetatron Aug 14, 2019
f5c6168
Adjusted test cases for Ping Response and Ping Request
RhinoBetatron Aug 14, 2019
fe64efc
Refactoring
RhinoBetatron Aug 19, 2019
c188069
Updated doc
RhinoBetatron Aug 27, 2019
2d2366c
Adressed PR comments
RhinoBetatron Sep 6, 2019
db7749d
Adressed PR comments
RhinoBetatron Sep 10, 2019
15a95f9
Fixed Unit tests
RhinoBetatron Sep 10, 2019
a335f55
Applied Async patch
RhinoBetatron Sep 11, 2019
b81d256
Removed unnecessary Test
RhinoBetatron Sep 11, 2019
8e42387
Adressed PR Comments
RhinoBetatron Sep 11, 2019
503716f
Removed TODO
RhinoBetatron Sep 11, 2019
9ba4926
Adjusted Ping Interceptors with the help of Integrationtests
RhinoBetatron Sep 17, 2019
beef170
Fixed formatting error in AbstractChannelInitializer
RhinoBetatron Sep 20, 2019
22085b6
Removed unnecessary completing of future
RhinoBetatron Sep 20, 2019
06cd8bf
Fixed error in PingInterceptorHandlerTest
RhinoBetatron Sep 20, 2019
cf51281
Changed variable name in AbstractChannelInitializer to PING_INTERCEPT…
RhinoBetatron Sep 20, 2019
4e811bb
Added Methods to ClientContext
RhinoBetatron Sep 23, 2019
caa3c5f
Added Test in ClientContextImplTest for Ping Interception
RhinoBetatron Sep 23, 2019
51951ea
Renamed Ping interception related classes
RhinoBetatron Sep 25, 2019
6d6dcb6
Formated Client Context
RhinoBetatron Oct 1, 2019
0139993
Fixed typos in javadoc of client context
RhinoBetatron Oct 7, 2019
90000f0
Fixed doc typo in clientcontext
RhinoBetatron Oct 7, 2019
c35027d
Fixed formatting in channelHandlerNames
RhinoBetatron Oct 14, 2019
bbb1a66
Refactored pinginterceptorhandler
RhinoBetatron Nov 20, 2019
395cb36
Refactored pinginterceptorhandler
RhinoBetatron Nov 20, 2019
805342d
Adressed PR changes
RhinoBetatron Nov 20, 2019
52e46c8
Fixed broken inbound handling
RhinoBetatron Nov 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,7 +28,7 @@
* @since 4.0.0
*/
@DoNotImplement
public interface AsyncOutput<T> {
public interface AsyncOutput<T> extends SimpleAsyncOutput<T> {

/**
* If the timeout is expired before {@link Async#resume()} is called then the outcome is
Expand All @@ -43,15 +43,4 @@ public interface AsyncOutput<T> {
* @since 4.0.0
*/
@NotNull Async<T> async(@NotNull Duration timeout, @NotNull TimeoutFallback fallback);

/**
* If the timeout is expired before {@link Async#resume()} is called then the outcome is handled as failed.
* <p>
* Do not call this method more than once. If an async method is called multiple times an exception is thrown.
*
* @param timeout Timeout that HiveMQ waits for the result of the async operation.
* @throws UnsupportedOperationException If async is called more than once.
* @since 4.0.0
*/
@NotNull Async<T> async(@NotNull Duration timeout);
}
@@ -0,0 +1,27 @@
package com.hivemq.extension.sdk.api.async;

import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
import com.hivemq.extension.sdk.api.annotations.NotNull;

import java.time.Duration;

/**
* Enables an output object to be processed in a non-blocking way.
*
* @author Yannick Weber
*/
@DoNotImplement
public interface SimpleAsyncOutput<T> {

/**
* If the timeout is expired before {@link Async#resume()} is called then the outcome is handled as failed.
* <p>
* Do not call this method more than once. If an async method is called multiple times an exception is thrown.
*
* @param timeout Timeout that HiveMQ waits for the result of the async operation.
* @throws UnsupportedOperationException If async is called more than once.
* @since 4.0.0
*/
@NotNull Async<T> async(@NotNull Duration timeout);

}
Expand Up @@ -20,6 +20,8 @@
import com.hivemq.extension.sdk.api.annotations.Immutable;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.interceptor.Interceptor;
import com.hivemq.extension.sdk.api.interceptor.pingrequest.PingReqInboundInterceptor;
import com.hivemq.extension.sdk.api.interceptor.pingresponse.PingRespOutboundInterceptor;
import com.hivemq.extension.sdk.api.interceptor.publish.PublishInboundInterceptor;
import com.hivemq.extension.sdk.api.interceptor.publish.PublishOutboundInterceptor;
import com.hivemq.extension.sdk.api.interceptor.subscribe.SubscribeInboundInterceptor;
Expand All @@ -39,6 +41,24 @@
@DoNotImplement
public interface ClientContext {

/**
* Adds an {@link PingReqInboundInterceptor} for this client. <br> Subsequent adding of the same interceptor
* will be ignored.
*
* @param pingReqInboundInterceptor The implementation of a PingReqInInboundInterceptor.
* @throws NullPointerException If the interceptor is null.
*/
void addPingReqInboundInterceptor(@NotNull PingReqInboundInterceptor pingReqInboundInterceptor);

/**
* Adds an {@link PingRespOutboundInterceptor} for this client. <br> Subsequent adding of the same interceptor
* will be ignored.
*
* @param pingRespOutboundInterceptor The implementation of a PingRespOutboundInterceptor.
* @throws NullPointerException If the interceptor is null.
*/
void addPingRespOutboundInterceptor(@NotNull PingRespOutboundInterceptor pingRespOutboundInterceptor);

/**
* Adds an {@link PublishInboundInterceptor} for this client. <br>
* Subsequent adding of the same interceptor will be ignored.
Expand Down Expand Up @@ -100,6 +120,25 @@ public interface ClientContext {
*/
void removeSubscribeInboundInterceptor(@NotNull SubscribeInboundInterceptor subscribeInboundInterceptor);

/**
* Removes a {@link PingRespOutboundInterceptor} for this client. <br> Nothing happens if the interceptor that
* should be removed, has not been added in the first place.
*
* @param pingRespOutboundInterceptor The implementation of a PingResponseOutboundInterceptor.
* @throws NullPointerException If the interceptor is null.
*/
void removePingRespOutboundInterceptor(
@NotNull PingRespOutboundInterceptor pingRespOutboundInterceptor);

/**
* Removes a {@link PingReqInboundInterceptor} for this client. <br> Nothing happens if the interceptor that
* should be removed, has not been added in the first place.
*
* @param pingReqInboundInterceptor The implementation of a PingRequestInboundInterceptor.
* @throws NullPointerException If the interceptor is null.
*/
void removePingReqInboundInterceptor(@NotNull PingReqInboundInterceptor pingReqInboundInterceptor);

/**
* Returns all {@link Interceptor} which are registered for this client.
*
Expand Down Expand Up @@ -136,6 +175,23 @@ public interface ClientContext {
@Immutable
@NotNull List<@NotNull SubscribeInboundInterceptor> getSubscribeInboundInterceptors();

/**
* Returns all {@link PingReqInboundInterceptor} which are registered for this client by this extension.
*
* @return List of PingReqInboundInterceptors for this client.
*/
@Immutable
@NotNull List<@NotNull PingReqInboundInterceptor> getPingReqInboundInterceptors();

/**
* Returns all {@link PingRespOutboundInterceptor} which are registered for this client by this extension.
*
* @return List of PingRespOutboundInterceptors for this client.
*/
@Immutable
@NotNull List<@NotNull PingRespOutboundInterceptor> getPingRespOutboundInterceptors();


/**
* The default permissions for this client. Default permissions are automatically applied by HiveMQ for every
* MQTT PUBLISH and SUBSCRIBE packet sent by this client.
Expand Down
@@ -0,0 +1,32 @@
package com.hivemq.extension.sdk.api.interceptor.pingrequest;

import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.interceptor.Interceptor;
import com.hivemq.extension.sdk.api.interceptor.pingrequest.parameter.PingReqInboundInput;
import com.hivemq.extension.sdk.api.interceptor.pingrequest.parameter.PingReqInboundOutput;

/**
* Interface for the ping request inbound interception.
* <p>
* Interceptors are always called by the same Thread for all messages from the same client.
* <p>
* If the same instance is shared between multiple clients it can be called in different Threads and must therefore be
* thread-safe.
*
* @author Robin Atherton
*/
@FunctionalInterface
public interface PingReqInboundInterceptor extends Interceptor {

/**
* When a {@link PingReqInboundInterceptor} is set through any extension, this method gets called for every
* inbound PINGREQ packet from any MQTT client.
*
* @param pingReqInboundInput The {@link PingReqInboundInput} parameter.
* @param pingReqInboundOutput The {@link PingReqInboundOutput} parameter.
*/
void onInboundPingReq(
@NotNull PingReqInboundInput pingReqInboundInput,
@NotNull PingReqInboundOutput pingReqInboundOutput);

}
@@ -0,0 +1,15 @@
package com.hivemq.extension.sdk.api.interceptor.pingrequest.parameter;

import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
import com.hivemq.extension.sdk.api.interceptor.pingrequest.PingReqInboundInterceptor;
import com.hivemq.extension.sdk.api.parameter.ClientBasedInput;

/**
* This is the input parameter of any {@link PingReqInboundInterceptor}
*
* @author Robin Atherton
*/
@DoNotImplement
public interface PingReqInboundInput extends ClientBasedInput {
RobinAtherton marked this conversation as resolved.
Show resolved Hide resolved

}
@@ -0,0 +1,15 @@
package com.hivemq.extension.sdk.api.interceptor.pingrequest.parameter;

import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
import com.hivemq.extension.sdk.api.async.SimpleAsyncOutput;
import com.hivemq.extension.sdk.api.interceptor.pingrequest.PingReqInboundInterceptor;

/**
* This is the output parameter of any {@link PingReqInboundInterceptor}.
*
* @author Robin Atherton
*/
@DoNotImplement
public interface PingReqInboundOutput extends SimpleAsyncOutput<PingReqInboundOutput> {
RobinAtherton marked this conversation as resolved.
Show resolved Hide resolved

}
@@ -0,0 +1,33 @@
package com.hivemq.extension.sdk.api.interceptor.pingresponse;

import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.interceptor.Interceptor;
import com.hivemq.extension.sdk.api.interceptor.pingresponse.parameter.PingRespOutboundInput;
import com.hivemq.extension.sdk.api.interceptor.pingresponse.parameter.PingRespOutboundOutput;

/**
* Interface for the ping response interception.
* <p>
* Interceptors are always called by the same thread for all messages for the same client.
* <p>
* If the same instance is shared between multiple clients it can be called by different threads and therefore must be
* thread-safe.
* <p>
*
* @author Robin Atherton
*/
@FunctionalInterface
public interface PingRespOutboundInterceptor extends Interceptor {

/**
* When a {@link PingRespOutboundInterceptor} is set through any extension, this method gets called for every
* outbound PINGRESP packet from any MQTT client.
*
* @param pingRespOutboundInput The {@link PingRespOutboundInput} parameter.
* @param pingRespOutboundOutput The {@link PingRespOutboundOutput} parameter.
*/
void onOutboundPingResp(
@NotNull PingRespOutboundInput pingRespOutboundInput,
@NotNull PingRespOutboundOutput pingRespOutboundOutput);

}
@@ -0,0 +1,16 @@
package com.hivemq.extension.sdk.api.interceptor.pingresponse.parameter;

import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
import com.hivemq.extension.sdk.api.interceptor.pingresponse.PingRespOutboundInterceptor;
import com.hivemq.extension.sdk.api.parameter.ClientBasedInput;

/**
* This is the input parameter of any {@link PingRespOutboundInterceptor}.
*
* @author Robin Atherton
*/
@DoNotImplement
public interface PingRespOutboundInput extends ClientBasedInput {
RobinAtherton marked this conversation as resolved.
Show resolved Hide resolved


}
@@ -0,0 +1,14 @@
package com.hivemq.extension.sdk.api.interceptor.pingresponse.parameter;

import com.hivemq.extension.sdk.api.annotations.DoNotImplement;
import com.hivemq.extension.sdk.api.async.SimpleAsyncOutput;
import com.hivemq.extension.sdk.api.interceptor.pingresponse.PingRespOutboundInterceptor;

/**
* This is the output parameter of any {@link PingRespOutboundInterceptor}.
*
* @author Robin Atherton
*/
@DoNotImplement
public interface PingRespOutboundOutput extends SimpleAsyncOutput<PingRespOutboundOutput> {
RobinAtherton marked this conversation as resolved.
Show resolved Hide resolved
}
14 changes: 10 additions & 4 deletions src/main/java/com/hivemq/bootstrap/netty/ChannelDependencies.java
Expand Up @@ -25,10 +25,6 @@
import com.hivemq.configuration.service.FullConfigurationService;
import com.hivemq.configuration.service.RestrictionsConfigurationService;
import com.hivemq.extensions.handler.*;
import com.hivemq.extensions.handler.ClientLifecycleEventHandler;
import com.hivemq.extensions.handler.IncomingPublishHandler;
import com.hivemq.extensions.handler.IncomingSubscribeHandler;
import com.hivemq.extensions.handler.PluginInitializerHandler;
import com.hivemq.logging.EventLog;
import com.hivemq.metrics.MetricsHolder;
import com.hivemq.metrics.handler.MetricsInitializer;
Expand Down Expand Up @@ -156,6 +152,9 @@ public class ChannelDependencies {
@NotNull
private final Provider<IncomingSubscribeHandler> incomingSubscribeHandlerProvider;

@NotNull
private final PingInterceptorHandler pingInterceptorHandler;

@NotNull
private final PublishOutboundInterceptorHandler publishOutboundInterceptorHandler;

Expand Down Expand Up @@ -199,6 +198,7 @@ public ChannelDependencies(
@NotNull final Provider<IncomingPublishHandler> incomingPublishHandlerProvider,
@NotNull final Provider<IncomingSubscribeHandler> incomingSubscribeHandlerProvider,
@NotNull final Provider<PublishMessageExpiryHandler> publishMessageExpiryHandlerProvider,
@NotNull final PingInterceptorHandler pingInterceptorHandler,
@NotNull final PublishOutboundInterceptorHandler publishOutboundInterceptorHandler,
@NotNull final ConnectInboundInterceptorHandler connectInboundInterceptorHandler,
@NotNull final ConnackOutboundInterceptorHandler connackOutboundInterceptorHandler) {
Expand Down Expand Up @@ -236,6 +236,7 @@ public ChannelDependencies(
this.incomingPublishHandlerProvider = incomingPublishHandlerProvider;
this.incomingSubscribeHandlerProvider = incomingSubscribeHandlerProvider;
this.publishMessageExpiryHandlerProvider = publishMessageExpiryHandlerProvider;
this.pingInterceptorHandler = pingInterceptorHandler;
this.publishOutboundInterceptorHandler = publishOutboundInterceptorHandler;
this.connectInboundInterceptorHandler = connectInboundInterceptorHandler;
this.connackOutboundInterceptorHandler = connackOutboundInterceptorHandler;
Expand Down Expand Up @@ -415,4 +416,9 @@ public ConnectInboundInterceptorHandler getConnectInboundInterceptorHandler() {
public ConnackOutboundInterceptorHandler getConnackOutboundInterceptorHandler() {
return connackOutboundInterceptorHandler;
}

@NotNull
public PingInterceptorHandler getPingInterceptorHandler() {
return pingInterceptorHandler;
}
}
Expand Up @@ -104,6 +104,7 @@ public class ChannelHandlerNames {
public static final String MQTT_MESSAGE_BARRIER = "mqtt_message_barrier";
public static final String MQTT_SUBSCRIBE_MESSAGE_BARRIER = "mqtt_subscribe_message_barrier";
public static final String STOP_READING_AFTER_CONNECT_HANDLER = "stop_reading_after_connect_handler";
public static final String PING_INTERCEPTOR_HANDLER = "pingrequest_pingresponse_interceptor_handler";

/* *************
* Extensions *
Expand Down
Expand Up @@ -91,6 +91,8 @@ protected void initChannel(@NotNull final Channel ch) throws Exception {
ch.pipeline().addLast(CONNECT_INBOUND_INTERCEPTOR_HANDLER, channelDependencies.getConnectInboundInterceptorHandler());
ch.pipeline().addLast(CLIENT_LIFECYCLE_EVENT_HANDLER, channelDependencies.getClientLifecycleEventHandler());

ch.pipeline().addLast(PING_INTERCEPTOR_HANDLER, channelDependencies.getPingInterceptorHandler());

ch.pipeline().addLast(PUBLISH_OUTBOUND_INTERCEPTOR_HANDLER, channelDependencies.getPublishOutboundInterceptorHandler());

ch.pipeline().addLast(LISTENER_ATTRIBUTE_ADDER, channelDependencies.getListenerAttributeAdderFactory().get(listener));
Expand Down