diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 050db394fb0e5..586f72b49dec0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -18,15 +18,23 @@ */ package org.apache.pulsar.client.api; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import java.time.Clock; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; + +import lombok.Cleanup; + import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy; import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -623,4 +631,76 @@ public void testAutoBytesProducer() throws Exception { log.info("-- Exiting {} test --", methodName); } + + @Test + public void testMessageBuilderLoadConf() throws Exception { + String topic = "my-topic-" + System.nanoTime(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-subscriber-name") + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Map properties = new HashMap<>(); + properties.put("a", "1"); + properties.put("b", "2"); + + Map msgConf = new HashMap<>(); + msgConf.put("key", "key-1"); + msgConf.put("properties", properties); + msgConf.put("eventTime", 1234L); + msgConf.put("sequenceId", 5L); + msgConf.put("replicationClusters", Lists.newArrayList("a", "b", "c")); + msgConf.put("disableReplication", false); + + producer.newMessage() + .value("my-message") + .loadConf(msgConf) + .send(); + + + Message msg = consumer.receive(); + assertEquals(msg.getKey(), "key-1"); + assertEquals(msg.getProperties().get("a"), "1"); + assertEquals(msg.getProperties().get("b"), "2"); + assertEquals(msg.getEventTime(), 1234); + assertEquals(msg.getSequenceId(), 5); + + consumer.acknowledge(msg); + + // Try with invalid confs + msgConf.clear(); + msgConf.put("nonExistingKey", "key-1"); + + try { + producer.newMessage() + .value("my-message") + .loadConf(msgConf) + .send(); + fail("Should have failed"); + } catch (RuntimeException e) { + // expected + } + + // Try with invalid type + msgConf.clear(); + msgConf.put("eventTime", "hello"); + + try { + producer.newMessage() + .value("my-message") + .loadConf(msgConf) + .send(); + fail("Should have failed"); + } catch (RuntimeException e) { + // expected + } + } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java index 423c08019d262..a1e2f2da4f6aa 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TypedMessageBuilder.java @@ -174,4 +174,46 @@ public interface TypedMessageBuilder extends Serializable { * @return the message builder instance */ TypedMessageBuilder disableReplication(); + + /** + * Configure the {@link TypedMessageBuilder} from a config map, as an alternative compared + * to call the individual builder methods. + *

+ * The "value" of the message itself cannot be set on the config map. + *

+ * Example: + * + *

{@code
+     * Map conf = new HashMap<>();
+     * conf.put("key", "my-key");
+     * conf.put("eventTime", System.currentTimeMillis());
+     *
+     * producer.newMessage()
+     *             .value("my-message")
+     *             .loadConf(conf)
+     *             .send();
+     * }
+ * + * The available options are: + * + * + * + * + * + * + * + * + *
ConstantNameTypeDoc
{@link #CONF_KEY}{@code key}{@code String}{@link #key(String)}
{@link #CONF_PROPERTIES}{@code properties}{@code Map}{@link #properties(Map)}
{@link #CONF_EVENT_TIME}{@code eventTime}{@code long}{@link #eventTime(long)}
{@link #CONF_SEQUENCE_ID}{@code sequenceId}{@code long}{@link #sequenceId(long)}
{@link #CONF_REPLICATION_CLUSTERS}{@code replicationClusters}{@code List}{@link #replicationClusters(List)}
{@link #CONF_DISABLE_REPLICATION}{@code disableReplication}{@code boolean}{@link #disableReplication()}
+ * + * @param config a map with the configuration options for the message + * @return the message builder instance + */ + TypedMessageBuilder loadConf(Map config); + + static final String CONF_KEY = "key"; + static final String CONF_PROPERTIES = "properties"; + static final String CONF_EVENT_TIME = "eventTime"; + static final String CONF_SEQUENCE_ID = "sequenceId"; + static final String CONF_REPLICATION_CLUSTERS = "replicationClusters"; + static final String CONF_DISABLE_REPLICATION = "disableReplication"; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java index b13423ec1f448..fb18c0ec907eb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.client.util.TypeCheckUtil.checkType; import com.google.common.base.Preconditions; @@ -130,6 +131,32 @@ public TypedMessageBuilder disableReplication() { return this; } + @SuppressWarnings("unchecked") + @Override + public TypedMessageBuilder loadConf(Map config) { + config.forEach((key, value) -> { + if (key.equals(CONF_KEY)) { + this.key(checkType(value, String.class)); + } else if (key.equals(CONF_PROPERTIES)) { + this.properties(checkType(value, Map.class)); + } else if (key.equals(CONF_EVENT_TIME)) { + this.eventTime(checkType(value, Long.class)); + } else if (key.equals(CONF_SEQUENCE_ID)) { + this.sequenceId(checkType(value, Long.class)); + } else if (key.equals(CONF_REPLICATION_CLUSTERS)) { + this.replicationClusters(checkType(value, List.class)); + } else if (key.equals(CONF_DISABLE_REPLICATION)) { + boolean disableReplication = checkType(value, Boolean.class); + if (disableReplication) { + this.disableReplication(); + } + } else { + throw new RuntimeException("Invalid message config key '" + key + "'"); + } + }); + return this; + } + public MessageMetadata.Builder getMetadataBuilder() { return msgMetadataBuilder; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java new file mode 100644 index 0000000000000..cbabdfe1e5412 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TypeCheckUtil.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.util; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class TypeCheckUtil { + @SuppressWarnings("unchecked") + public static T checkType(Object o, Class clazz) { + if (!clazz.isInstance(o)) { + throw new RuntimeException( + String.format("Invalid object type '%s' when exepcting '%s'", o.getClass().getName(), clazz.getName())); + } + return (T) o; + } +}