From 2e84172181bf15881f21741af97c01bd7a4fe589 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 9 Apr 2019 16:19:13 -0700 Subject: [PATCH 1/4] Allow to configure TypedMessageBuilder through a Map conf object --- .../api/SimpleTypedProducerConsumerTest.java | 80 +++++++++++++++++++ .../client/api/TypedMessageBuilder.java | 35 ++++++++ .../client/impl/TypedMessageBuilderImpl.java | 27 +++++++ .../impl/conf/ConfigurationDataUtils.java | 34 ++------ .../pulsar/client/util/TypeCheckUtil.java | 33 ++++++++ 5 files changed, 183 insertions(+), 26 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 050db394fb0e5..c1ef47584343c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -18,15 +18,23 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import java.time.Clock; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; + +import lombok.Cleanup; + import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy; import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -623,4 +631,76 @@ public void testAutoBytesProducer() throws Exception { log.info("-- Exiting {} test --", methodName); } + + @Test + public void testMessageBuilderLoadConf() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic-" + System.nanoTime(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-subscriber-name") + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Map properties = new HashMap<>(); + properties.put("a", "1"); + properties.put("b", "2"); + + Map msgConf = new HashMap<>(); + msgConf.put("key", "key-1"); + msgConf.put("properties", properties); + msgConf.put("eventTime", 1234); + msgConf.put("sequenceId", 5); + msgConf.put("replicationClusters", Lists.newArrayList("a", "b", "c")); + msgConf.put("disableReplication", false); + + producer.newMessage() + .value("my-message") + .loadConf(msgConf) + .send(); + + + Message msg = consumer.receive(); + assertEquals(msg.getKey(), "key-1"); + assertEquals(msg.getProperties().get("a"), "1"); + assertEquals(msg.getProperties().get("b"), "2"); + assertEquals(msg.getEventTime(), 1234); + assertEquals(msg.getSequenceId(), 5); + + consumer.acknowledge(msg); + + // Try with invalid confs + msgConf.clear(); + msgConf.put("nonExistingKey", "key-1"); + + try { + producer.newMessage() + .value("my-message") + .loadConf(msgConf) + .send(); + fail("Should have failed"); + } catch (RuntimeException e) { + // expected + } + + // Try with invalid type + msgConf.clear(); + msgConf.put("eventTime", "hello"); + + try { + producer.newMessage() + .value("my-message") + .loadConf(msgConf) + .send(); + fail("Should have failed"); + } catch (RuntimeException e) { + // expected + } + } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java index 423c08019d262..cb00f0138a85b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java @@ -174,4 +174,39 @@ public interface TypedMessageBuilder extends Serializable { * @return the message builder instance */ TypedMessageBuilder disableReplication(); + + /** + * Configure the {@link TypedMessageBuilder} from a config map, as an alternative compared + * to call the individual builder methods. + *

+ * The "value" of the message itself cannot be set on the config map. + *

+ * Example: + * + *

{@code
+     * Map conf = new HashMap<>();
+     * conf.put("key", "my-key");
+     * conf.put("eventTime", System.currentTimeMillis());
+     *
+     * producer.newMessage()
+     *             .value("my-message")
+     *             .loadConf(conf)
+     *             .send();
+     * }
+ * + * The available options are: + * + * + * + * + * + * + * + * + *
NameTypeDoc
{@code key}{@code String}{@link #key(String)}
{@code properties}{@code Map}{@link #properties(Map)}
{@code eventTime}{@code long}{@link #eventTime(long)}
{@code sequenceId}{@code long}{@link #sequenceId(long)}
{@code replicationClusters}{@code List}{@link #replicationClusters(List)}
{@code disableReplication}{@code boolean}{@link #disableReplication()}
+ * + * @param config a map with the configuration options for the message + * @return the message builder instance + */ + TypedMessageBuilder loadConf(Map config); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index b13423ec1f448..1c483b9d1fa2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.client.util.TypeCheckUtil.checkType; import com.google.common.base.Preconditions; @@ -130,6 +131,32 @@ public TypedMessageBuilder disableReplication() { return this; } + @SuppressWarnings("unchecked") + @Override + public TypedMessageBuilder loadConf(Map config) { + config.forEach((key, value) -> { + if (key.equals("key")) { + this.key(checkType(value, String.class)); + } else if (key.equals("properties")) { + this.properties(checkType(value, Map.class)); + } else if (key.equals("eventTime")) { + this.eventTime(checkType(value, Number.class).longValue()); + } else if (key.equals("sequenceId")) { + this.sequenceId(checkType(value, Number.class).longValue()); + } else if (key.equals("replicationClusters")) { + this.replicationClusters(checkType(value, List.class)); + } else if (key.equals("disableReplication")) { + boolean disableReplication = checkType(value, Boolean.class); + if (disableReplication) { + this.disableReplication(); + } + } else { + throw new RuntimeException("Invalid message config key '" + key + "'"); + } + }); + return this; + } + public MessageMetadata.Builder getMetadataBuilder() { return msgMetadataBuilder; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java index 4523939e07e56..20254b2705a9a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java @@ -18,45 +18,27 @@ */ package org.apache.pulsar.client.impl.conf; -import com.fasterxml.jackson.annotation.JsonInclude.Include; -import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; -import io.netty.util.concurrent.FastThreadLocal; + import java.io.IOException; import java.util.Map; +import lombok.experimental.UtilityClass; + +import org.apache.pulsar.common.util.ObjectMapperFactory; + /** * Utils for loading configuration data. */ +@UtilityClass public final class ConfigurationDataUtils { - public static ObjectMapper create() { - ObjectMapper mapper = new ObjectMapper(); - // forward compatibility for the properties may go away in the future - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); - mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, false); - mapper.setSerializationInclusion(Include.NON_NULL); - return mapper; - } - - private static final FastThreadLocal mapper = new FastThreadLocal() { - @Override - protected ObjectMapper initialValue() throws Exception { - return create(); - } - }; - - public static ObjectMapper getThreadLocal() { - return mapper.get(); - } - - private ConfigurationDataUtils() {} - + @SuppressWarnings("unchecked") public static T loadData(Map config, T existingData, Class dataCls) { - ObjectMapper mapper = getThreadLocal(); + ObjectMapper mapper = ObjectMapperFactory.getThreadLocal(); try { String existingConfigJson = mapper.writeValueAsString(existingData); Map existingConfig = mapper.readValue(existingConfigJson, Map.class); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java new file mode 100644 index 0000000000000..494c964ed71ed --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.util; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class TypeCheckUtil { + @SuppressWarnings("unchecked") + public static T checkType(Object o, Class clazz) { + if (!clazz.isInstance(o)) { + throw new RuntimeException( + String.format("Invalid object type '%s' when exepcting '%s'", o.getClass(), clazz)); + } + return (T) o; + } +} From 8885b0cbe78c4a6188c4c89892aa62c0f1a63310 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 9 Apr 2019 16:51:33 -0700 Subject: [PATCH 2/4] Use constants for message confs --- .../api/SimpleTypedProducerConsumerTest.java | 2 +- .../client/api/TypedMessageBuilder.java | 21 ++++++++++++------- .../client/impl/TypedMessageBuilderImpl.java | 12 +++++------ 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index c1ef47584343c..97438b7b3e5e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -634,7 +634,7 @@ public void testAutoBytesProducer() throws Exception { @Test public void testMessageBuilderLoadConf() throws Exception { - String topic = "persistent://my-property/use/my-ns/my-topic-" + System.nanoTime(); + String topic = "my-topic-" + System.nanoTime(); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java index cb00f0138a85b..a1e2f2da4f6aa 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java @@ -196,17 +196,24 @@ public interface TypedMessageBuilder extends Serializable { * * The available options are: * - * - * - * - * - * - * - * + * + * + * + * + * + * + * *
NameTypeDoc
{@code key}{@code String}{@link #key(String)}
{@code properties}{@code Map}{@link #properties(Map)}
{@code eventTime}{@code long}{@link #eventTime(long)}
{@code sequenceId}{@code long}{@link #sequenceId(long)}
{@code replicationClusters}{@code List}{@link #replicationClusters(List)}
{@code disableReplication}{@code boolean}{@link #disableReplication()}
ConstantNameTypeDoc
{@link #CONF_KEY}{@code key}{@code String}{@link #key(String)}
{@link #CONF_PROPERTIES}{@code properties}{@code Map}{@link #properties(Map)}
{@link #CONF_EVENT_TIME}{@code eventTime}{@code long}{@link #eventTime(long)}
{@link #CONF_SEQUENCE_ID}{@code sequenceId}{@code long}{@link #sequenceId(long)}
{@link #CONF_REPLICATION_CLUSTERS}{@code replicationClusters}{@code List}{@link #replicationClusters(List)}
{@link #CONF_DISABLE_REPLICATION}{@code disableReplication}{@code boolean}{@link #disableReplication()}
* * @param config a map with the configuration options for the message * @return the message builder instance */ TypedMessageBuilder loadConf(Map config); + + static final String CONF_KEY = "key"; + static final String CONF_PROPERTIES = "properties"; + static final String CONF_EVENT_TIME = "eventTime"; + static final String CONF_SEQUENCE_ID = "sequenceId"; + static final String CONF_REPLICATION_CLUSTERS = "replicationClusters"; + static final String CONF_DISABLE_REPLICATION = "disableReplication"; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 1c483b9d1fa2c..758c08ca673c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -135,17 +135,17 @@ public TypedMessageBuilder disableReplication() { @Override public TypedMessageBuilder loadConf(Map config) { config.forEach((key, value) -> { - if (key.equals("key")) { + if (key.equals(CONF_KEY)) { this.key(checkType(value, String.class)); - } else if (key.equals("properties")) { + } else if (key.equals(CONF_PROPERTIES)) { this.properties(checkType(value, Map.class)); - } else if (key.equals("eventTime")) { + } else if (key.equals(CONF_EVENT_TIME)) { this.eventTime(checkType(value, Number.class).longValue()); - } else if (key.equals("sequenceId")) { + } else if (key.equals(CONF_SEQUENCE_ID)) { this.sequenceId(checkType(value, Number.class).longValue()); - } else if (key.equals("replicationClusters")) { + } else if (key.equals(CONF_REPLICATION_CLUSTERS)) { this.replicationClusters(checkType(value, List.class)); - } else if (key.equals("disableReplication")) { + } else if (key.equals(CONF_DISABLE_REPLICATION)) { boolean disableReplication = checkType(value, Boolean.class); if (disableReplication) { this.disableReplication(); From b5792b6448790772ed75a64b3d6865f924a8e525 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 9 Apr 2019 17:33:17 -0700 Subject: [PATCH 3/4] Reverted previous change --- .../impl/conf/ConfigurationDataUtils.java | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java index 20254b2705a9a..4523939e07e56 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtils.java @@ -18,27 +18,45 @@ */ package org.apache.pulsar.client.impl.conf; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; - +import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; import java.util.Map; -import lombok.experimental.UtilityClass; - -import org.apache.pulsar.common.util.ObjectMapperFactory; - /** * Utils for loading configuration data. */ -@UtilityClass public final class ConfigurationDataUtils { - @SuppressWarnings("unchecked") + public static ObjectMapper create() { + ObjectMapper mapper = new ObjectMapper(); + // forward compatibility for the properties may go away in the future + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, false); + mapper.setSerializationInclusion(Include.NON_NULL); + return mapper; + } + + private static final FastThreadLocal mapper = new FastThreadLocal() { + @Override + protected ObjectMapper initialValue() throws Exception { + return create(); + } + }; + + public static ObjectMapper getThreadLocal() { + return mapper.get(); + } + + private ConfigurationDataUtils() {} + public static T loadData(Map config, T existingData, Class dataCls) { - ObjectMapper mapper = ObjectMapperFactory.getThreadLocal(); + ObjectMapper mapper = getThreadLocal(); try { String existingConfigJson = mapper.writeValueAsString(existingData); Map existingConfig = mapper.readValue(existingConfigJson, Map.class); From 90aa99998afe3d94396e4257601f6d3b5e574dbd Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 9 Apr 2019 17:35:24 -0700 Subject: [PATCH 4/4] Use Long instead of Number --- .../pulsar/client/api/SimpleTypedProducerConsumerTest.java | 4 ++-- .../apache/pulsar/client/impl/TypedMessageBuilderImpl.java | 4 ++-- .../java/org/apache/pulsar/client/util/TypeCheckUtil.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 97438b7b3e5e3..586f72b49dec0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -654,8 +654,8 @@ public void testMessageBuilderLoadConf() throws Exception { Map msgConf = new HashMap<>(); msgConf.put("key", "key-1"); msgConf.put("properties", properties); - msgConf.put("eventTime", 1234); - msgConf.put("sequenceId", 5); + msgConf.put("eventTime", 1234L); + msgConf.put("sequenceId", 5L); msgConf.put("replicationClusters", Lists.newArrayList("a", "b", "c")); msgConf.put("disableReplication", false); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index 758c08ca673c6..fb18c0ec907eb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -140,9 +140,9 @@ public TypedMessageBuilder loadConf(Map config) { } else if (key.equals(CONF_PROPERTIES)) { this.properties(checkType(value, Map.class)); } else if (key.equals(CONF_EVENT_TIME)) { - this.eventTime(checkType(value, Number.class).longValue()); + this.eventTime(checkType(value, Long.class)); } else if (key.equals(CONF_SEQUENCE_ID)) { - this.sequenceId(checkType(value, Number.class).longValue()); + this.sequenceId(checkType(value, Long.class)); } else if (key.equals(CONF_REPLICATION_CLUSTERS)) { this.replicationClusters(checkType(value, List.class)); } else if (key.equals(CONF_DISABLE_REPLICATION)) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java index 494c964ed71ed..cbabdfe1e5412 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java @@ -26,7 +26,7 @@ public class TypeCheckUtil { public static T checkType(Object o, Class clazz) { if (!clazz.isInstance(o)) { throw new RuntimeException( - String.format("Invalid object type '%s' when exepcting '%s'", o.getClass(), clazz)); + String.format("Invalid object type '%s' when exepcting '%s'", o.getClass().getName(), clazz.getName())); } return (T) o; }