Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add epoch for connection handler to handle create producer timeout. #5571

Merged
merged 6 commits into from Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -61,6 +61,8 @@ public class Producer {
private final Topic topic;
private final ServerCnx cnx;
private final String producerName;
private final long epoch;
private final boolean isGeneratedName;
private final long producerId;
private final String appId;
private Rate msgIn;
Expand All @@ -86,11 +88,14 @@ public class Producer {
private final SchemaVersion schemaVersion;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion) {
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean isGeneratedName) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
this.producerName = checkNotNull(producerName);
this.isGeneratedName = isGeneratedName;
this.epoch = epoch;
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
Expand Down Expand Up @@ -478,6 +483,14 @@ public boolean isNonPersistentTopic() {
return isNonPersistentTopic;
}

public long getEpoch() {
return epoch;
}

public boolean isGeneratedName() {
return isGeneratedName;
}

@VisibleForTesting
long getPendingPublishAcks() {
return pendingPublishAcks;
Expand Down
Expand Up @@ -817,6 +817,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final long epoch = cmdProducer.getEpoch();
final boolean isGeneratedName = cmdProducer.getIsGeneratedName();
final boolean isEncrypted = cmdProducer.getEncrypted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;
Expand Down Expand Up @@ -938,7 +940,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {

schemaVersionFuture.thenAccept(schemaVersion -> {
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata, schemaVersion);
isEncrypted, metadata, schemaVersion, epoch, isGeneratedName);

try {
topic.addProducer(producer);
Expand Down
Expand Up @@ -425,8 +425,22 @@ public void addProducer(Producer producer) throws BrokerServiceException {
}

if (!producers.add(producer)) {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
boolean canOverwrite = false;
for (Producer existProducer : producers.values()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid iterating over the producers set? I.e. can you change the hash set to a hash map?

if (existProducer.equals(producer) && existProducer.isGeneratedName()
&& producer.isGeneratedName() && producer.getEpoch() > existProducer.getEpoch()) {
existProducer.close();
canOverwrite = true;
break;
}
}
if (canOverwrite) {
producers.remove(producer);
producers.add(producer);
} else {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
}
}

USAGE_COUNT_UPDATER.incrementAndGet(this);
Expand Down
Expand Up @@ -113,6 +113,7 @@
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -355,7 +356,7 @@ public void testAddRemoveProducer() throws Exception {
String role = "appid1";
// 1. simple add producer
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest);
role, false, null, SchemaVersion.Latest, 0, false);
topic.addProducer(producer);
assertEquals(topic.getProducers().size(), 1);

Expand All @@ -371,7 +372,7 @@ public void testAddRemoveProducer() throws Exception {
// 3. add producer for a different topic
PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService);
Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest);
role, false, null, SchemaVersion.Latest,0, false);
try {
topic.addProducer(failProducer);
fail("should have failed");
Expand All @@ -387,22 +388,69 @@ public void testAddRemoveProducer() throws Exception {
topic.removeProducer(producer); /* noop */
}

@Test
public void testProducerOverwrite() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
String role = "appid1";
Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false);
Producer producer2= new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 0, false);
try {
topic.addProducer(producer1);
topic.addProducer(producer2);
fail("should have failed");
} catch (BrokerServiceException.NamingException e) {
// OK
}

Assert.assertEquals(topic.getProducers().size(), 1);

Producer producer3= new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 1, true);

try {
topic.addProducer(producer3);
fail("should have failed");
} catch (BrokerServiceException.NamingException e) {
// OK
}

Assert.assertEquals(topic.getProducers().size(), 1);

topic.removeProducer(producer1);
Assert.assertEquals(topic.getProducers().size(), 0);

Producer producer4= new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest, 2, true);

topic.addProducer(producer3);
topic.addProducer(producer4);

Assert.assertEquals(topic.getProducers().size(), 1);

topic.getProducers().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 2));
}

public void testMaxProducers() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
String role = "appid1";
// 1. add producer1
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null, SchemaVersion.Latest);
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role,
false, null, SchemaVersion.Latest,0, false);
topic.addProducer(producer);
assertEquals(topic.getProducers().size(), 1);

// 2. add producer2
Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null, SchemaVersion.Latest);
Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role,
false, null, SchemaVersion.Latest,0, false);
topic.addProducer(producer2);
assertEquals(topic.getProducers().size(), 2);

// 3. add producer3 but reached maxProducersPerTopic
try {
Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null, SchemaVersion.Latest);
Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role,
false, null, SchemaVersion.Latest,0, false);
topic.addProducer(producer3);
fail("should have failed");
} catch (BrokerServiceException e) {
Expand Down Expand Up @@ -800,7 +848,7 @@ public void testDeleteTopic() throws Exception {
// 2. delete topic with producer
topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest);
role, false, null, SchemaVersion.Latest, 0, false);
topic.addProducer(producer);

assertTrue(topic.delete().isCompletedExceptionally());
Expand Down Expand Up @@ -959,7 +1007,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
String role = "appid1";
Thread.sleep(10); /* delay to ensure that the delete gets executed first */
Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
role, false, null, SchemaVersion.Latest);
role, false, null, SchemaVersion.Latest, 0, false);
topic.addProducer(producer);
fail("Should have failed");
} catch (BrokerServiceException e) {
Expand Down
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import org.apache.pulsar.client.impl.ProducerImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ProducerCreationTest extends ProducerConsumerBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testExactlyOnceWithProducerNameSpecified() throws PulsarClientException {
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic("testExactlyOnceWithProducerNameSpecified")
.producerName("p-name-1")
.create();

Assert.assertNotNull(producer1);

Producer<byte[]> producer2 = pulsarClient.newProducer()
.topic("testExactlyOnceWithProducerNameSpecified")
.producerName("p-name-2")
.create();

Assert.assertNotNull(producer2);

try {
pulsarClient.newProducer()
.topic("testExactlyOnceWithProducerNameSpecified")
.producerName("p-name-2")
.create();
Assert.fail("should be failed");
} catch (PulsarClientException.ProducerBusyException e) {
//ok here
}
}

@Test
public void testGeneratedNameProducerReconnect() throws PulsarClientException, InterruptedException {
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic("testGeneratedNameProducerReconnect")
.create();
Assert.assertTrue(producer.isConnected());
//simulate create producer timeout.
Thread.sleep(3000);

producer.getConnectionHandler().connectionClosed(producer.getConnectionHandler().getClientCnx());
Assert.assertFalse(producer.isConnected());
Thread.sleep(3000);
Assert.assertEquals(producer.getConnectionHandler().getEpoch(), 1);
Assert.assertTrue(producer.isConnected());
}
}
Expand Up @@ -38,6 +38,11 @@ public interface Producer<T> extends Closeable {
*/
String getProducerName();

/**
* @return the name of producer is generated or user specified
*/
boolean isGeneratedName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to expose this to producer api? I think this is an implementation detail, which should be hidden in the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, currently no need to expose to the interface


/**
* Sends a message.
*
Expand Down
Expand Up @@ -20,19 +20,22 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.HandlerState.State;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConnectionHandler {
public class ConnectionHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add @VisiableForTesting ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me try

private static final AtomicReferenceFieldUpdater<ConnectionHandler, ClientCnx> CLIENT_CNX_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, ClientCnx.class, "clientCnx");
@SuppressWarnings("unused")
private volatile ClientCnx clientCnx = null;

protected final HandlerState state;
protected final Backoff backoff;
protected long epoch = 0L;

interface Connection {
void connectionFailed(PulsarClientException exception);
Expand Down Expand Up @@ -94,11 +97,13 @@ protected void reconnectLater(Throwable exception) {
state.setState(State.Connecting);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
++epoch;
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}

protected void connectionClosed(ClientCnx cnx) {
@VisibleForTesting
public void connectionClosed(ClientCnx cnx) {
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!isValidStateForReconnection()) {
log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState());
Expand All @@ -110,6 +115,7 @@ protected void connectionClosed(ClientCnx cnx) {
delayMs / 1000.0);
state.client.timer().newTimeout(timeout -> {
log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName());
++epoch;
grabCnx();
}, delayMs, TimeUnit.MILLISECONDS);
}
Expand All @@ -127,7 +133,8 @@ protected boolean isRetriableError(PulsarClientException e) {
return e instanceof PulsarClientException.LookupException;
}

protected ClientCnx getClientCnx() {
@VisibleForTesting
public ClientCnx getClientCnx() {
return CLIENT_CNX_UPDATER.get(this);
}

Expand All @@ -153,5 +160,10 @@ private boolean isValidStateForReconnection() {
return false;
}

@VisibleForTesting
public long getEpoch() {
return epoch;
}

private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
}
Expand Up @@ -114,6 +114,11 @@ public String getProducerName() {
return producers.get(0).getProducerName();
}

@Override
public boolean isGeneratedName() {
return producers.get(0).isGeneratedName();
}

@Override
public long getLastSequenceId() {
// Return the highest sequence id across all partitions. This will be correct,
Expand Down