Skip to content

Commit ffd8dd2

Browse files
committedAug 10, 2021
Mqtt3/5Publish.getPayload Optional<ByteBuffer> -> ByteBuffer
Mqtt3/5PublishBuilderBase.payload expects NotNull arguments
1 parent 00de5b5 commit ffd8dd2

25 files changed

+211
-224
lines changed
 

‎src/main/java/com/hivemq/client2/internal/mqtt/codec/decoder/mqtt3/Mqtt3PublishDecoder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class Mqtt3PublishDecoder implements MqttMessageDecoder {
6767
final int packetIdentifier = decodePublishPacketIdentifier(qos, in);
6868

6969
final int payloadLength = in.readableBytes();
70-
ByteBuffer payload = null;
70+
ByteBuffer payload = ByteBufferUtil.EMPTY_BYTE_BUFFER;
7171
if (payloadLength > 0) {
7272
payload = ByteBufferUtil.allocate(payloadLength, context.useDirectBufferPayload());
7373
in.readBytes(payload);

‎src/main/java/com/hivemq/client2/internal/mqtt/codec/decoder/mqtt5/Mqtt5PublishDecoder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public class Mqtt5PublishDecoder implements MqttMessageDecoder {
191191
}
192192

193193
final int payloadLength = in.readableBytes();
194-
ByteBuffer payload = null;
194+
ByteBuffer payload = ByteBufferUtil.EMPTY_BYTE_BUFFER;
195195
if (payloadLength > 0) {
196196
payload = ByteBufferUtil.allocate(payloadLength, context.useDirectBufferPayload());
197197
in.readBytes(payload);

‎src/main/java/com/hivemq/client2/internal/mqtt/codec/encoder/MqttMessageEncoderUtil.java

-12
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ public static int nullableEncodedLength(final @Nullable ByteBuffer byteBuffer) {
6262
return (byteBuffer == null) ? 0 : MqttBinaryData.encodedLength(byteBuffer);
6363
}
6464

65-
public static int encodedOrEmptyLength(final @Nullable ByteBuffer byteBuffer) {
66-
return (byteBuffer == null) ? MqttBinaryData.EMPTY_LENGTH : MqttBinaryData.encodedLength(byteBuffer);
67-
}
68-
6965
public static void encodeNullable(final @Nullable MqttUtf8StringImpl string, final @NotNull ByteBuf out) {
7066
if (string != null) {
7167
string.encode(out);
@@ -78,14 +74,6 @@ public static void encodeNullable(final @Nullable ByteBuffer byteBuffer, final @
7874
}
7975
}
8076

81-
public static void encodeOrEmpty(final @Nullable ByteBuffer byteBuffer, final @NotNull ByteBuf out) {
82-
if (byteBuffer != null) {
83-
MqttBinaryData.encode(byteBuffer, out);
84-
} else {
85-
MqttBinaryData.encodeEmpty(out);
86-
}
87-
}
88-
8977
public static @NotNull MqttEncodeException maximumPacketSizeExceeded(
9078
final @NotNull MqttMessage message, final int encodedLength, final int maxPacketSize) {
9179

‎src/main/java/com/hivemq/client2/internal/mqtt/codec/encoder/mqtt3/Mqtt3ConnectEncoder.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.hivemq.client2.internal.mqtt.codec.encoder.mqtt3;
1818

19+
import com.hivemq.client2.internal.mqtt.datatypes.MqttBinaryData;
1920
import com.hivemq.client2.internal.mqtt.datatypes.MqttUtf8StringImpl;
2021
import com.hivemq.client2.internal.mqtt.datatypes.MqttVariableByteInteger;
2122
import com.hivemq.client2.internal.mqtt.message.auth.MqttSimpleAuth;
@@ -29,7 +30,8 @@
2930
import javax.inject.Inject;
3031
import javax.inject.Singleton;
3132

32-
import static com.hivemq.client2.internal.mqtt.codec.encoder.MqttMessageEncoderUtil.*;
33+
import static com.hivemq.client2.internal.mqtt.codec.encoder.MqttMessageEncoderUtil.encodeNullable;
34+
import static com.hivemq.client2.internal.mqtt.codec.encoder.MqttMessageEncoderUtil.nullableEncodedLength;
3335

3436
/**
3537
* @author Silvio Giebl
@@ -62,7 +64,7 @@ int remainingLength(final @NotNull MqttStatefulConnect message) {
6264
final MqttWillPublish willPublish = stateless.getRawWillPublish();
6365
if (willPublish != null) {
6466
remainingLength += willPublish.getTopic().encodedLength();
65-
remainingLength += encodedOrEmptyLength(willPublish.getRawPayload());
67+
remainingLength += MqttBinaryData.encodedLength(willPublish.getRawPayload());
6668
}
6769

6870
return remainingLength;
@@ -132,7 +134,7 @@ private void encodeWillPublish(final @NotNull MqttStatefulConnect message, final
132134
final MqttWillPublish willPublish = message.stateless().getRawWillPublish();
133135
if (willPublish != null) {
134136
willPublish.getTopic().encode(out);
135-
encodeNullable(willPublish.getRawPayload(), out);
137+
MqttBinaryData.encode(willPublish.getRawPayload(), out);
136138
}
137139
}
138140
}

‎src/main/java/com/hivemq/client2/internal/mqtt/codec/encoder/mqtt3/Mqtt3PublishEncoder.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,7 @@ int remainingLength(final @NotNull MqttStatefulPublish message) {
5353
remainingLength += 2;
5454
}
5555

56-
final ByteBuffer payload = stateless.getRawPayload();
57-
if (payload != null) {
58-
remainingLength += payload.remaining();
59-
}
56+
remainingLength += stateless.getRawPayload().remaining();
6057

6158
return remainingLength;
6259
}
@@ -69,7 +66,7 @@ int remainingLength(final @NotNull MqttStatefulPublish message) {
6966
final int remainingLength) {
7067

7168
final ByteBuffer payload = message.stateless().getRawPayload();
72-
if ((payload != null) && payload.isDirect()) {
69+
if (payload.hasRemaining() && payload.isDirect()) {
7370
final int encodedLengthWithoutPayload = encodedLength - payload.remaining();
7471
final ByteBuf out =
7572
context.getAllocator().ioBuffer(encodedLengthWithoutPayload, encodedLengthWithoutPayload);
@@ -119,7 +116,7 @@ private void encodeVariableHeader(final @NotNull MqttStatefulPublish message, fi
119116

120117
private void encodePayload(final @NotNull MqttStatefulPublish message, final @NotNull ByteBuf out) {
121118
final ByteBuffer payload = message.stateless().getRawPayload();
122-
if ((payload != null) && !payload.isDirect()) {
119+
if (payload.hasRemaining() && !payload.isDirect()) {
123120
out.writeBytes(payload.duplicate());
124121
}
125122
}

‎src/main/java/com/hivemq/client2/internal/mqtt/codec/encoder/mqtt5/Mqtt5ConnectEncoder.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.hivemq.client2.internal.mqtt.codec.encoder.MqttEncoderContext;
2020
import com.hivemq.client2.internal.mqtt.codec.encoder.MqttMessageEncoder;
21+
import com.hivemq.client2.internal.mqtt.datatypes.MqttBinaryData;
2122
import com.hivemq.client2.internal.mqtt.datatypes.MqttUtf8StringImpl;
2223
import com.hivemq.client2.internal.mqtt.datatypes.MqttVariableByteInteger;
2324
import com.hivemq.client2.internal.mqtt.message.auth.MqttEnhancedAuth;
@@ -118,7 +119,7 @@ private int remainingLengthWithoutProperties(final @NotNull MqttStatefulConnect
118119
final MqttWillPublish willPublish = stateless.getRawWillPublish();
119120
if (willPublish != null) {
120121
remainingLength += willPublish.getTopic().encodedLength();
121-
remainingLength += encodedOrEmptyLength(willPublish.getRawPayload());
122+
remainingLength += MqttBinaryData.encodedLength(willPublish.getRawPayload());
122123
}
123124

124125
return remainingLength;
@@ -316,7 +317,7 @@ private void encodeWillPublish(
316317
MqttWillPublish.DEFAULT_DELAY_INTERVAL, out);
317318

318319
willPublish.getTopic().encode(out);
319-
encodeOrEmpty(willPublish.getRawPayload(), out);
320+
MqttBinaryData.encode(willPublish.getRawPayload(), out);
320321
}
321322
}
322323
}

‎src/main/java/com/hivemq/client2/internal/mqtt/codec/encoder/mqtt5/Mqtt5PublishEncoder.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,7 @@ int remainingLengthWithoutProperties(final @NotNull MqttStatefulPublish message)
6464
remainingLength += 2;
6565
}
6666

67-
final ByteBuffer payload = stateless.getRawPayload();
68-
if (payload != null) {
69-
remainingLength += payload.remaining();
70-
}
67+
remainingLength += stateless.getRawPayload().remaining();
7168

7269
return remainingLength;
7370
}
@@ -111,7 +108,7 @@ final int fixedPropertyLength(final @NotNull MqttPublish publish) {
111108
final int omittedProperties) {
112109

113110
final ByteBuffer payload = message.stateless().getRawPayload();
114-
if ((payload != null) && payload.isDirect()) {
111+
if (payload.hasRemaining() && payload.isDirect()) {
115112
final int encodedLengthWithoutPayload = encodedLength - payload.remaining();
116113
final ByteBuf out =
117114
context.getAllocator().ioBuffer(encodedLengthWithoutPayload, encodedLengthWithoutPayload);
@@ -207,7 +204,7 @@ final void encodeFixedProperties(
207204

208205
private void encodePayload(final @NotNull MqttStatefulPublish message, final @NotNull ByteBuf out) {
209206
final ByteBuffer payload = message.stateless().getRawPayload();
210-
if ((payload != null) && !payload.isDirect()) {
207+
if (payload.hasRemaining() && !payload.isDirect()) {
211208
out.writeBytes(payload.duplicate());
212209
}
213210
}

‎src/main/java/com/hivemq/client2/internal/mqtt/datatypes/MqttBinaryData.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.hivemq.client2.internal.mqtt.datatypes;
1818

19+
import com.hivemq.client2.internal.util.ByteArrayUtil;
1920
import com.hivemq.client2.internal.util.ByteBufferUtil;
2021
import io.netty.buffer.ByteBuf;
2122
import org.jetbrains.annotations.NotNull;
@@ -44,6 +45,9 @@ public final class MqttBinaryData {
4445
return null;
4546
}
4647
final int length = byteBuf.readUnsignedShort();
48+
if (length == 0) {
49+
return ByteArrayUtil.EMPTY_BYTE_ARRAY;
50+
}
4751
if (byteBuf.readableBytes() < length) {
4852
return null;
4953
}
@@ -64,6 +68,9 @@ public final class MqttBinaryData {
6468
return null;
6569
}
6670
final int length = byteBuf.readUnsignedShort();
71+
if (length == 0) {
72+
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
73+
}
6774
if (byteBuf.readableBytes() < length) {
6875
return null;
6976
}
@@ -95,8 +102,12 @@ public static void encode(final byte @NotNull [] binary, final @NotNull ByteBuf
95102
* @param byteBuf the byte buffer to encode to.
96103
*/
97104
public static void encode(final @NotNull ByteBuffer byteBuffer, final @NotNull ByteBuf byteBuf) {
98-
byteBuf.writeShort(byteBuffer.remaining());
99-
byteBuf.writeBytes(byteBuffer.duplicate());
105+
if (byteBuffer.hasRemaining()) { // avoid calling byteBuffer.duplicate()
106+
byteBuf.writeShort(byteBuffer.remaining());
107+
byteBuf.writeBytes(byteBuffer.duplicate());
108+
} else {
109+
encodeEmpty(byteBuf);
110+
}
100111
}
101112

102113
/**

‎src/main/java/com/hivemq/client2/internal/mqtt/message/publish/MqttPublish.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class MqttPublish extends MqttMessageWithUserProperties implements Mqtt5P
5353
public static final long NO_MESSAGE_EXPIRY = -1;
5454

5555
private final @NotNull MqttTopicImpl topic;
56-
private final @Nullable ByteBuffer payload;
56+
private final @NotNull ByteBuffer payload;
5757
private final @NotNull MqttQos qos;
5858
private final boolean retain;
5959
private final @Range(from = -1, to = UnsignedDataTypes.UNSIGNED_INT_MAX_VALUE) long messageExpiryInterval;
@@ -66,7 +66,7 @@ public class MqttPublish extends MqttMessageWithUserProperties implements Mqtt5P
6666

6767
public MqttPublish(
6868
final @NotNull MqttTopicImpl topic,
69-
final @Nullable ByteBuffer payload,
69+
final @NotNull ByteBuffer payload,
7070
final @NotNull MqttQos qos,
7171
final boolean retain,
7272
final @Range(from = -1, to = UnsignedDataTypes.UNSIGNED_INT_MAX_VALUE) long messageExpiryInterval,
@@ -96,11 +96,11 @@ public MqttPublish(
9696
}
9797

9898
@Override
99-
public @NotNull Optional<ByteBuffer> getPayload() {
100-
return ByteBufferUtil.optionalReadOnly(payload);
99+
public @NotNull ByteBuffer getPayload() {
100+
return payload.asReadOnlyBuffer();
101101
}
102102

103-
public @Nullable ByteBuffer getRawPayload() {
103+
public @NotNull ByteBuffer getRawPayload() {
104104
return payload;
105105
}
106106

@@ -211,13 +211,12 @@ public void acknowledge() {
211211

212212
@Override
213213
protected @NotNull String toAttributeString() {
214-
return "topic=" + topic + ((payload == null) ? "" : ", payload=" + payload.remaining() + "byte") + ", qos=" +
215-
qos + ", retain=" + retain + ((messageExpiryInterval == NO_MESSAGE_EXPIRY) ? "" :
216-
", messageExpiryInterval=" + messageExpiryInterval) +
217-
((payloadFormatIndicator == null) ? "" : ", payloadFormatIndicator=" + payloadFormatIndicator) +
218-
((contentType == null) ? "" : ", contentType=" + contentType) +
219-
((responseTopic == null) ? "" : ", responseTopic=" + responseTopic) +
220-
((correlationData == null) ? "" : ", correlationData=" + correlationData.remaining() + "byte") +
214+
return "topic=" + topic + ", payload=" + payload.remaining() + "byte" + ", qos=" + qos + ", retain=" + retain +
215+
(messageExpiryInterval == NO_MESSAGE_EXPIRY ? "" : ", messageExpiryInterval=" + messageExpiryInterval) +
216+
(payloadFormatIndicator == null ? "" : ", payloadFormatIndicator=" + payloadFormatIndicator) +
217+
(contentType == null ? "" : ", contentType=" + contentType) +
218+
(responseTopic == null ? "" : ", responseTopic=" + responseTopic) +
219+
(correlationData == null ? "" : ", correlationData=" + correlationData.remaining() + "byte") +
221220
StringUtil.prepend(", ", super.toAttributeString());
222221
}
223222

‎src/main/java/com/hivemq/client2/internal/mqtt/message/publish/MqttPublishBuilder.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
public abstract class MqttPublishBuilder<B extends MqttPublishBuilder<B>> {
4242

4343
@Nullable MqttTopicImpl topic;
44-
@Nullable ByteBuffer payload;
44+
@NotNull ByteBuffer payload = ByteBufferUtil.EMPTY_BYTE_BUFFER;
4545
@NotNull MqttQos qos = MqttPublish.DEFAULT_QOS;
4646
boolean retain;
4747
long messageExpiryInterval = MqttPublish.NO_MESSAGE_EXPIRY;
@@ -172,12 +172,12 @@ private static abstract class Base<B extends Base<B>> extends MqttPublishBuilder
172172
}
173173

174174
public @NotNull B payload(final byte @Nullable [] payload) {
175-
this.payload = ByteBufferUtil.wrap(payload);
175+
this.payload = ByteBuffer.wrap(Checks.notNull(payload, "Payload"));
176176
return self();
177177
}
178178

179179
public @NotNull B payload(final @Nullable ByteBuffer payload) {
180-
this.payload = ByteBufferUtil.slice(payload);
180+
this.payload = Checks.notNull(payload, "Payload").slice();
181181
return self();
182182
}
183183

@@ -270,12 +270,12 @@ private static abstract class WillBase<B extends WillBase<B>> extends MqttPublis
270270
}
271271

272272
public @NotNull B payload(final byte @Nullable [] payload) {
273-
this.payload = MqttChecks.binaryDataOrNull(payload, "Payload");
273+
this.payload = MqttChecks.binaryData(payload, "Payload");
274274
return self();
275275
}
276276

277277
public @NotNull B payload(final @Nullable ByteBuffer payload) {
278-
this.payload = MqttChecks.binaryDataOrNull(payload, "Payload");
278+
this.payload = MqttChecks.binaryData(payload, "Payload");
279279
return self();
280280
}
281281

‎src/main/java/com/hivemq/client2/internal/mqtt/message/publish/MqttWillPublish.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class MqttWillPublish extends MqttPublish implements Mqtt5WillPublish {
4040

4141
public MqttWillPublish(
4242
final @NotNull MqttTopicImpl topic,
43-
final @Nullable ByteBuffer payload,
43+
final @NotNull ByteBuffer payload,
4444
final @NotNull MqttQos qos,
4545
final boolean isRetain,
4646
final @Range(from = -1, to = UnsignedDataTypes.UNSIGNED_INT_MAX_VALUE) long messageExpiryInterval,

‎src/main/java/com/hivemq/client2/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.jetbrains.annotations.Unmodifiable;
3333

3434
import java.nio.ByteBuffer;
35-
import java.util.Optional;
3635

3736
/**
3837
* @author Silvio Giebl
@@ -46,7 +45,7 @@ public class Mqtt3PublishView implements Mqtt3Publish {
4645

4746
public static @NotNull MqttPublish delegate(
4847
final @NotNull MqttTopicImpl topic,
49-
final @Nullable ByteBuffer payload,
48+
final @NotNull ByteBuffer payload,
5049
final @NotNull MqttQos qos,
5150
final boolean retain) {
5251

@@ -63,7 +62,7 @@ public class Mqtt3PublishView implements Mqtt3Publish {
6362

6463
static @NotNull Mqtt3PublishView of(
6564
final @NotNull MqttTopicImpl topic,
66-
final @Nullable ByteBuffer payload,
65+
final @NotNull ByteBuffer payload,
6766
final @NotNull MqttQos qos,
6867
final boolean retain) {
6968

@@ -72,7 +71,7 @@ public class Mqtt3PublishView implements Mqtt3Publish {
7271

7372
static @NotNull Mqtt3PublishView willOf(
7473
final @NotNull MqttTopicImpl topic,
75-
final @Nullable ByteBuffer payload,
74+
final @NotNull ByteBuffer payload,
7675
final @NotNull MqttQos qos,
7776
final boolean retain) {
7877

@@ -101,7 +100,7 @@ private Mqtt3PublishView(final @NotNull MqttPublish delegate) {
101100
}
102101

103102
@Override
104-
public @NotNull Optional<ByteBuffer> getPayload() {
103+
public @NotNull ByteBuffer getPayload() {
105104
return delegate.getPayload();
106105
}
107106

@@ -135,9 +134,8 @@ public void acknowledge() {
135134
}
136135

137136
private @NotNull String toAttributeString() {
138-
return "topic=" + getTopic() + ((delegate.getRawPayload() == null) ? "" :
139-
", payload=" + delegate.getRawPayload().remaining() + "byte") + ", qos=" + getQos() + ", retain=" +
140-
isRetain();
137+
return "topic=" + getTopic() + ", payload=" + delegate.getRawPayload().remaining() + "byte" + ", qos=" +
138+
getQos() + ", retain=" + isRetain();
141139
}
142140

143141
@Override

0 commit comments

Comments
 (0)
Failed to load comments.