Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
Expand Down Expand Up @@ -156,8 +157,7 @@ public void testPulsarClientCloseThenCloseTcClient() throws Exception {
try {
handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to add fail() here.

} catch (ExecutionException | InterruptedException e) {
assertTrue(e.getCause()
instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
assertTrue(e.getCause() instanceof PulsarClientException.MetaStoreHandlerHasClosedException);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,32 @@ public TransactionConflictException(String msg) {
}
}

/**
* Thrown when send request to transaction meta store but the transaction meta store handler not ready.
*/
public static class MetaStoreHandlerNotReadyException extends PulsarClientException {
public MetaStoreHandlerNotReadyException(long tcId) {
super("Transaction meta store handler for transaction meta store [" + tcId + "] not ready now.");
}

public MetaStoreHandlerNotReadyException(String message) {
super(message);
}
}

/**
* Thrown when send request to transaction meta store but the transaction meta store handler has closed.
*/
public static class MetaStoreHandlerHasClosedException extends PulsarClientException {
public MetaStoreHandlerHasClosedException(long tcId) {
super("Transaction meta store handler for transaction meta store [" + tcId + "] has closed.");
}

public MetaStoreHandlerHasClosedException(String message) {
super(message);
}
}

// wrap an exception to enriching more info messages.
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
Expand Down Expand Up @@ -972,6 +998,10 @@ public static Throwable wrap(Throwable t, String msg) {
return new MessageAcknowledgeException(msg);
} else if (t instanceof TransactionConflictException) {
return new TransactionConflictException(msg);
} else if (t instanceof MetaStoreHandlerNotReadyException) {
return new MetaStoreHandlerNotReadyException(msg);
} else if (t instanceof MetaStoreHandlerHasClosedException) {
return new MetaStoreHandlerHasClosedException(msg);
} else if (t instanceof PulsarClientException) {
return new PulsarClientException(msg);
} else if (t instanceof CompletionException) {
Expand Down Expand Up @@ -1062,6 +1092,10 @@ public static PulsarClientException unwrap(Throwable t) {
newException = new MessageAcknowledgeException(msg);
} else if (cause instanceof TransactionConflictException) {
newException = new TransactionConflictException(msg);
} else if (cause instanceof MetaStoreHandlerNotReadyException) {
newException = new MetaStoreHandlerNotReadyException(msg);
} else if (cause instanceof MetaStoreHandlerHasClosedException) {
newException = new MetaStoreHandlerHasClosedException(msg);
} else if (cause instanceof TopicDoesNotExistException) {
newException = new TopicDoesNotExistException(msg);
} else if (cause instanceof ProducerFencedException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,6 @@ public MetaStoreHandlerNotExistsException(String message) {
}
}

/**
* Thrown when send request to transaction meta store but the transaction meta store handler not ready.
*/
public static class MetaStoreHandlerNotReadyException extends TransactionCoordinatorClientException {
public MetaStoreHandlerNotReadyException(long tcId) {
super("Transaction meta store handler for transaction meta store {} not ready now.");
}

public MetaStoreHandlerNotReadyException(String message) {
super(message);
}
}

public static TransactionCoordinatorClientException unwrap(Throwable t) {
if (t instanceof TransactionCoordinatorClientException) {
return (TransactionCoordinatorClientException) t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,25 +668,18 @@ private boolean checkStateAndSendRequest(OpBase<?> op) {
case Closing:
case Closed:
op.callback.completeExceptionally(
new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
"Transaction meta store handler for tcId "
+ transactionCoordinatorId
+ " is closing or closed."));
new PulsarClientException.MetaStoreHandlerHasClosedException(transactionCoordinatorId));
onResponse(op);
return false;
case Failed:
case Uninitialized:
op.callback.completeExceptionally(
new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
"Transaction meta store handler for tcId "
+ transactionCoordinatorId
+ " not connected."));
new PulsarClientException.MetaStoreHandlerNotReadyException(transactionCoordinatorId));
onResponse(op);
return false;
default:
op.callback.completeExceptionally(
new TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
transactionCoordinatorId));
new PulsarClientException.MetaStoreHandlerNotReadyException(transactionCoordinatorId));
onResponse(op);
return false;
}
Expand Down