New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][transactions] Add command to list transaction coordinators #17522
[improve][transactions] Add command to list transaction coordinators #17522
Conversation
6a05d5d
to
ede029b
Compare
.thenAccept(map -> { | ||
if (map.isEmpty()) { | ||
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, | ||
"Transaction coordinator not found")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not actually a "problem" to be reported as 404.
we can simply return an empty result.
how can we get to this situation ? when the partitions are not loaded by any broker ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, removed
public void testListTransactionCoordinators() throws Exception { | ||
initTransaction(4); | ||
final Map<Integer, TransactionCoordinatorInfo> result = admin | ||
.transactions().listTransactionCoordinatorsAsync().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use Awaitility here ?
are we guaranteed that when we reach this point all the partitions are loaded ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the initTransaction
method we wait for all the topic partitions to be loaded
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size() == coordinatorSize);
return; | ||
} | ||
Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>(); | ||
map.forEach((topicPartition, brokerServiceUrl) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should return something also for the partitions that are not currently "loaded"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* | ||
* @return the transaction coordinators list. | ||
*/ | ||
Map<Integer, TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question (not a request for a change):
why returning a Map and not a List ?
I don't think that returning a Map has a particular advantage over returning a List
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to List
@NoArgsConstructor | ||
@AllArgsConstructor | ||
public class TransactionCoordinatorInfo { | ||
private String brokerServiceUrl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add the id as well, it would make it easier further processing downstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
ede029b
to
7b1659d
Compare
@nicoloboschi hi, why not add the broker address into |
The pr had no activity for 30 days, mark with Stale label. |
7b1659d
to
f3f9c78
Compare
The stats output is not needed here and they're hard to read considering you have multiple coordinators. Adding the related broker to the stat object requires a lookup request. I'd prefer to not add it @eolivelli PTAL again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/pulsarbot rerun-failure-checks |
Codecov Report
@@ Coverage Diff @@
## master #17522 +/- ##
============================================
+ Coverage 46.98% 47.07% +0.08%
- Complexity 10343 10382 +39
============================================
Files 692 692
Lines 67766 67790 +24
Branches 7259 7258 -1
============================================
+ Hits 31842 31913 +71
+ Misses 32344 32315 -29
+ Partials 3580 3562 -18
Flags with carried forward coverage won't be shown. Click here to find out more.
|
initTransaction(4); | ||
final List<TransactionCoordinatorInfo> result = admin | ||
.transactions().listTransactionCoordinatorsAsync().get(); | ||
System.out.println("result" + result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete
return getPulsarResources() | ||
.getTopicResources() | ||
.getExistingPartitions(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN); | ||
}) | ||
.thenAccept(allPartitions -> { | ||
allPartitions | ||
.stream() | ||
.filter(partition -> SystemTopicNames | ||
.isTransactionCoordinatorAssign(TopicName.get(partition))) | ||
.forEach(partition -> { | ||
final int coordinatorId = TopicName.getPartitionIndex(partition); | ||
if (!result.containsKey(coordinatorId)) { | ||
result.put(coordinatorId, | ||
new TransactionCoordinatorInfo(coordinatorId, null)); | ||
} | ||
}); | ||
asyncResponse.resume(result.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to check again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because there may be partitions not currently loaded by any broker but it's valuable for the user to know that exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for late response, lookupPartitionedTopicAsync has already return all the partition, it will not missing. are you test it will miss?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right. I left only the lookup PTAL
@congbobo184 I fixed the debug line and answered to your question |
8adec32
to
de0300a
Compare
…pache#17522) * [improve][transactions] Add command to list transaction coordinators url * Address pr's comment and rebase * checkstyle * remove debug * Remove useless call and fix style * style (cherry picked from commit 1db89d7)
Fixes #17513
Motivation
At the moment there's no way to list all the transaction coordinators and to understand to which broker own.
Modifications
lookup
to the TC partitioned topicbrokerServiceUrl
fieldcoordinators-list
doc-not-needed