-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transaction] Transaction buffer snapshot implementation. #9490
Merged
codelipenghui
merged 20 commits into
apache:master
from
congbobo184:congbobo184_transaction_buffer_snapshot
Mar 10, 2021
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
874e7a6
[Transaction] Transaction buffer snapshot implementation.
0e96de3
Merge branch 'master' into congbobo184_transaction_buffer_snapshot
e8c2399
Change proto to avro
ea517a5
fix the test nullpoint
688f27c
Fix the transaction buffer provider
cbb1339
fix some comment
bb8bfca
Merge branch 'master' into congbobo184_transaction_buffer_snapshot
ab97093
fix some comment
2860c65
Merge branch 'master' into congbobo184_transaction_buffer_snapshot
a7504fb
Add the error in cpp-client
3dc8464
Fix transaction buffer handle exception
bc53492
Fix the codestyle
d434987
Delete transaction buffer not recover exception
879a0ef
add the handle the future logic
25290d7
fix the code style
cf58bf5
Merge branch 'master' into congbobo184_transaction_buffer_snapshot
d4f123e
Merge branch 'master' into congbobo184_transaction_buffer_snapshot
03da520
fix the transaction test
77db43d
Merge branch 'master' into congbobo184_transaction_buffer_snapshot
fabe392
change the config name
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
86 changes: 86 additions & 0 deletions
86
...c/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.broker.service; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; | ||
import org.apache.pulsar.broker.systopic.SystemTopicClient; | ||
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader; | ||
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer; | ||
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient; | ||
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; | ||
import org.apache.pulsar.client.api.PulsarClient; | ||
import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException; | ||
import org.apache.pulsar.common.events.EventType; | ||
import org.apache.pulsar.common.naming.TopicName; | ||
import org.apache.pulsar.common.util.FutureUtil; | ||
|
||
public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService { | ||
|
||
private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients; | ||
|
||
private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory; | ||
|
||
public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) { | ||
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client); | ||
this.clients = new ConcurrentHashMap<>(); | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) { | ||
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync); | ||
} | ||
|
||
private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient( | ||
TopicName topicName) { | ||
TopicName systemTopicName = NamespaceEventsSystemTopicFactory | ||
.getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT); | ||
if (systemTopicName == null) { | ||
return FutureUtil.failedFuture( | ||
new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, " | ||
+ "because the topicName is null!")); | ||
} | ||
return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName, | ||
(v) -> namespaceEventsSystemTopicFactory | ||
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this))); | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) { | ||
return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync); | ||
} | ||
|
||
@Override | ||
public void removeClient(TopicName topicName, | ||
TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) { | ||
if (transactionBufferSystemTopicClient.getReaders().size() == 0 | ||
&& transactionBufferSystemTopicClient.getWriters().size() == 0) { | ||
clients.remove(topicName); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() throws Exception { | ||
for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) { | ||
entry.getValue().close(); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
...oker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.broker.service; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader; | ||
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer; | ||
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient; | ||
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; | ||
import org.apache.pulsar.common.naming.TopicName; | ||
|
||
public interface TransactionBufferSnapshotService { | ||
|
||
/** | ||
* Create a transaction buffer snapshot writer. | ||
* | ||
* @param topicName {@link TopicName} the topic name | ||
* | ||
* @return {@link CompletableFuture<Writer>} return the future of writer | ||
*/ | ||
CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName); | ||
|
||
/** | ||
* Create a transaction buffer snapshot reader. | ||
* | ||
* @param topicName {@link TopicName} the topic name | ||
* | ||
* @return {@link CompletableFuture<Writer>} return the future of reader | ||
*/ | ||
CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName); | ||
|
||
/** | ||
* Remove a topic client from cache. | ||
* | ||
* @param topicName {@link TopicName} the topic name | ||
* @param transactionBufferSystemTopicClient {@link TransactionBufferSystemTopicClient} the topic client | ||
* | ||
*/ | ||
void removeClient(TopicName topicName, TransactionBufferSystemTopicClient transactionBufferSystemTopicClient); | ||
|
||
/** | ||
* Close transaction buffer snapshot service. | ||
*/ | ||
void close() throws Exception; | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to throw a generic
Exception
?how can this error be handled downstream ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method only close pulsarService use, so I think we don't need to think too much .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay