-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-25071 ReplicationServer support start ReplicationSource internal #2452
Conversation
1edb685
to
8e6639e
Compare
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@@ -721,7 +721,8 @@ message ListReplicationSinkServersRequest { | |||
} | |||
|
|||
message ListReplicationSinkServersResponse { | |||
repeated ServerName server_name = 1; | |||
required bool is_replication_server = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this right name for the attribute? The method name for a boolean should be isATTRIBUTE but the ATTRIBUTE itself should not have the 'is' prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, you can't change the protobuf index.... it breaks compatibility. Give the replication_server '2' and leave server_name as '1'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only for the feature branch. Not merged to any branch now.
hbase-protocol-shaded/src/main/protobuf/server/replication/ReplicationServer.proto
Show resolved
Hide resolved
@@ -118,6 +118,7 @@ | |||
private boolean fetchServersUseZk = false; | |||
private FetchServersChore fetchServersChore; | |||
private int shortOperationTimeout; | |||
private boolean isReplicationServer = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, boolean should be named replicationServer and the method to access this data member is named isReplicationServer.
@@ -295,6 +296,7 @@ private synchronized AsyncClusterConnection getPeerConnection() throws IOExcepti | |||
.createBlockingRpcChannel(master, User.getCurrent(), shortOperationTimeout)); | |||
ListReplicationSinkServersResponse resp = masterStub | |||
.listReplicationSinkServers(null, ListReplicationSinkServersRequest.newBuilder().build()); | |||
isReplicationServer = resp.getIsReplicationServer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have to do hasReplicationServer first? And then read it if present? Or maybe this defaults false if not present?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is required. So not need to check hasReplicationServer first?
@@ -50,7 +72,7 @@ | |||
*/ | |||
@InterfaceAudience.Private | |||
@SuppressWarnings({ "deprecation"}) | |||
public class HReplicationServer extends Thread implements Server { | |||
public class HReplicationServer extends Thread implements Server, ReplicationSourceController { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like ReplicationServer already committed (why have the 'H' prefix?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only for the feature branch HBASE-24666.
|
||
private UserProvider userProvider; | ||
|
||
protected final ReplicationServerRpcServices rpcServices; | ||
private final ReplicationServerRpcServices rpcServices; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to do this? Just because we do it for HMaster and HRegionServer, it doesn't make it a good pattern. This is an Interface to pass instead of the impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is to mock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with this. Decouple a RpcService can make the code more clearly.
private final ConcurrentMap<String, ReplicationSourceInterface> sources = new ConcurrentHashMap<>(); | ||
|
||
private final ReplicationQueueStorage queueStorage; | ||
private final ReplicationPeers replicationPeers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this stuff normally inside a ReplicationSourceManager? Or maybe the Replication instance? Can you not use these instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key thing of this feature is here. The new HReplicationServer will be a independent server which manage some ReplicationSource.
The assumption here is that: Replication is a producer-consumer module. The "producer" RegionServer write WAL to HDFS and write WAL name to a replication queue, which can be identified by <RegionServer ServerName, QueueId>. The "consumer" ReplicationSource fetch the WAL name from the replication queue and read the WAL from HDFS and replicate to other HBase Clusters or other Systems.
When replication offload enabled, ReplicationSourceManager will only wirte the WAL name to the replication queue. But not start any ReplicationSource. HMaster get all replication queues and assign them to many ReplicationServers. Same with the region balancer, HMaster called a StartReplicationSource rpc to the ReplicationServer. Then the ReplicationServer will start a ReplicationSource thread to do the replication job.
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
Outdated
Show resolved
Hide resolved
@@ -52,15 +52,17 @@ | |||
* @param queueStorage the replication queue storage | |||
* @param replicationPeer the replication peer | |||
* @param server the server which start and run this replication source | |||
* @param producer the name of region server which produce the replication queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the RecoveryReplicationSource only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all ReplicationSource. The assumption here is that: all replication is a producer-consumer module.
@@ -360,8 +360,8 @@ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer | |||
MetricsSource metrics = new MetricsSource(queueId); | |||
sourceMetrics.put(queueId, metrics); | |||
// init replication source | |||
src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueId, clusterId, | |||
walFileLengthProvider, metrics); | |||
src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, server.getServerName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will be a time when the passed in 'server' differs from server.getServerName?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps say more about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One replication queue can be identified by <Producer ServerName, QueueId>.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And one ReplicationSource is responsible for one replication queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When replication offload disabled, the ReplicationSource will be started in RegionServer inside. So the producer name is same with the Server's name.
8e6639e
to
b7e51db
Compare
🎊 +1 overall
This message was automatically generated. |
b7e51db
to
425ffb4
Compare
💔 -1 overall
This message was automatically generated. |
425ffb4
to
bb46ef9
Compare
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
6008ffe
to
662f5e1
Compare
bb46ef9
to
5a39f53
Compare
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
8a063c3
to
108dddd
Compare
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
108dddd
to
97f0c23
Compare
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
97f0c23
to
7d4b041
Compare
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@@ -224,6 +226,35 @@ public void init(Configuration conf, FileSystem fs, Path walDir, | |||
this.abortOnError = this.conf.getBoolean("replication.source.regionserver.abort", | |||
true); | |||
|
|||
if (conf.getBoolean(HConstants.REPLICATION_OFFLOAD_ENABLE_KEY, | |||
HConstants.REPLICATION_OFFLOAD_ENABLE_DEFAULT)) { | |||
fetchWALsThread = new Thread(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interrrupt the fetchWALsThread
when the ReplicationServer exits or when the peer terminates?
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
7d4b041
to
a28ec44
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
a28ec44
to
c8537ef
Compare
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
#2452) Signed-off-by: XinSun <ddupgs@gmail.com>
#2452) Signed-off-by: XinSun <ddupgs@gmail.com>
#2452) Signed-off-by: XinSun <ddupgs@gmail.com>
No description provided.