Skip to content

Commit dd70cc3

Browse files
committed
HBASE-19073 Cleanup CoordinatedStateManager
- Merged BaseCSM class into CSM interface - Removed config hbase.coordinated.state.manager.class - Since state manager is not pluggable anymore, we don't need start/stop/initialize to setup unknown classes. Our internal ZkCSM now requires Server in constructor itself. Makes the dependency clearer too. - Removed CSM from HRegionServer and HMaster constructor. Although it's a step back from dependency injection, but it's more consistent with our current (not good) pattern where we initialize everything in the ctor itself. Change-Id: Ifca06bb354adec5b11ea1bad4707e014410491fc
1 parent eee3b01 commit dd70cc3

File tree

46 files changed

+158
-405
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+158
-405
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import org.apache.commons.logging.Log;
2727
import org.apache.commons.logging.LogFactory;
2828
import org.apache.hadoop.conf.Configuration;
29-
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
3029
import org.apache.hadoop.hbase.ServerName;
3130
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
3231
import org.apache.hadoop.hbase.backup.impl.BackupManager;
32+
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
3333
import org.apache.yetus.audience.InterfaceAudience;
34-
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
34+
import org.apache.hadoop.hbase.CoordinatedStateManager;
3535
import org.apache.hadoop.hbase.errorhandling.ForeignException;
3636
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
3737
import org.apache.hadoop.hbase.master.MasterServices;
@@ -95,10 +95,7 @@ public void initialize(MasterServices master, MetricsMaster metricsMaster)
9595

9696
// setup the default procedure coordinator
9797
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
98-
BaseCoordinatedStateManager coordManager =
99-
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory
100-
.getCoordinatedStateManager(master.getConfiguration());
101-
coordManager.initialize(master);
98+
CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(master);
10299
ProcedureCoordinatorRpcs comms =
103100
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
104101
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@
2424
import org.apache.commons.logging.Log;
2525
import org.apache.commons.logging.LogFactory;
2626
import org.apache.hadoop.conf.Configuration;
27-
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
27+
import org.apache.hadoop.hbase.CoordinatedStateManager;
2828
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
2929
import org.apache.hadoop.hbase.backup.impl.BackupManager;
3030
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
31+
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
3132
import org.apache.yetus.audience.InterfaceAudience;
32-
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
3333
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
3434
import org.apache.hadoop.hbase.procedure.ProcedureMember;
3535
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@@ -159,12 +159,8 @@ public void initialize(RegionServerServices rss) throws KeeperException {
159159
+ " setting");
160160
return;
161161
}
162-
BaseCoordinatedStateManager coordManager =
163-
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory.
164-
getCoordinatedStateManager(rss.getConfiguration());
165-
coordManager.initialize(rss);
166-
this.memberRpcs =
167-
coordManager
162+
CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(rss);
163+
this.memberRpcs = coordManager
168164
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
169165

170166
// read in the backup handler configuration properties

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,10 +1209,6 @@ public enum OperationStatusCode {
12091209

12101210
public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
12111211

1212-
/** Config for pluggable consensus provider */
1213-
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
1214-
"hbase.coordinated.state.manager.class";
1215-
12161212
/** Configuration key for SplitLog manager timeout */
12171213
public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout";
12181214

@@ -1298,7 +1294,7 @@ public enum OperationStatusCode {
12981294

12991295
public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
13001296
"hbase.canary.write.table.check.period";
1301-
1297+
13021298
public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = "hbase.canary.read.raw.enabled";
13031299

13041300
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hbase;
1919

20+
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
21+
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
22+
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
23+
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
2024
import org.apache.yetus.audience.InterfaceAudience;
25+
import org.apache.zookeeper.KeeperException;
26+
27+
import java.io.IOException;
2128

2229
/**
2330
* Implementations of this interface will keep and return to clients
@@ -28,31 +35,27 @@
2835
* For each coarse-grained area of operations there will be a separate
2936
* interface with implementation, providing API for relevant operations
3037
* requiring coordination.
31-
*
32-
* Property hbase.coordinated.state.manager.class in hbase-site.xml controls
33-
* which provider to use.
3438
*/
3539
@InterfaceAudience.Private
3640
public interface CoordinatedStateManager {
37-
3841
/**
39-
* Initialize coordinated state management service.
40-
* @param server server instance to run within.
42+
* Method to retrieve coordination for split log worker
4143
*/
42-
void initialize(Server server);
44+
SplitLogWorkerCoordination getSplitLogWorkerCoordination();
4345

4446
/**
45-
* Starts service.
47+
* Method to retrieve coordination for split log manager
4648
*/
47-
void start();
48-
49+
SplitLogManagerCoordination getSplitLogManagerCoordination();
4950
/**
50-
* Stops service.
51+
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
5152
*/
52-
void stop();
53+
ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
54+
throws IOException;
5355

5456
/**
55-
* @return instance of Server coordinated state manager runs within
57+
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
5658
*/
57-
Server getServer();
59+
ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException;
60+
5861
}

hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

hbase-server/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -173,14 +173,8 @@ public JVMClusterUtil.RegionServerThread addRegionServer(
173173
// Create each regionserver with its own Configuration instance so each has
174174
// its Connection instance rather than share (see HBASE_INSTANCES down in
175175
// the guts of ConnectionManager).
176-
177-
// Also, create separate CoordinatedStateManager instance per Server.
178-
// This is special case when we have to have more than 1 CoordinatedStateManager
179-
// within 1 process.
180-
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
181-
182176
JVMClusterUtil.RegionServerThread rst =
183-
JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf
177+
JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
184178
.getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);
185179

186180
this.regionThreads.add(rst);
@@ -208,13 +202,7 @@ public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
208202
// Create each master with its own Configuration instance so each has
209203
// its Connection instance rather than share (see HBASE_INSTANCES down in
210204
// the guts of ConnectionManager.
211-
212-
// Also, create separate CoordinatedStateManager instance per Server.
213-
// This is special case when we have to have more than 1 CoordinatedStateManager
214-
// within 1 process.
215-
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
216-
217-
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
205+
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
218206
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
219207
this.masterThreads.add(mt);
220208
return mt;

hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java

Lines changed: 0 additions & 76 deletions
This file was deleted.

hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public interface SplitLogManagerCoordination {
5858
/**
5959
* Detail class that shares data between coordination and split log manager
6060
*/
61-
public static class SplitLogManagerDetails {
61+
class SplitLogManagerDetails {
6262
final private ConcurrentMap<String, Task> tasks;
6363
final private MasterServices master;
6464
final private Set<String> failedDeletions;
@@ -156,7 +156,7 @@ void markRegionsRecovering(final ServerName serverName, Set<RegionInfo> userRegi
156156
* @throws InterruptedIOException
157157
* @throws IOException in case of failure
158158
*/
159-
void setRecoveryMode(boolean b) throws InterruptedIOException, IOException;
159+
void setRecoveryMode(boolean b) throws IOException;
160160

161161
/**
162162
* Removes known stale servers

hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
5050
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
5151
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
52+
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
5253
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
5354
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
5455
import org.apache.hadoop.hbase.wal.WALSplitter;
@@ -101,22 +102,21 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
101102

102103
private boolean isDrainingDone = false;
103104

104-
public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager,
105-
ZooKeeperWatcher watcher) {
105+
public ZKSplitLogManagerCoordination(Configuration conf, ZooKeeperWatcher watcher) {
106106
super(watcher);
107+
this.conf = conf;
107108
taskFinisher = new TaskFinisher() {
108109
@Override
109110
public Status finish(ServerName workerName, String logfile) {
110111
try {
111-
WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration());
112+
WALSplitter.finishSplitLogFile(logfile, conf);
112113
} catch (IOException e) {
113114
LOG.warn("Could not finish splitting of log file " + logfile, e);
114115
return Status.ERR;
115116
}
116117
return Status.DONE;
117118
}
118119
};
119-
this.conf = manager.getServer().getConfiguration();
120120
}
121121

122122
@Override
@@ -1122,6 +1122,7 @@ public long getLastRecoveryTime() {
11221122
/**
11231123
* Temporary function that is used by unit tests only
11241124
*/
1125+
@VisibleForTesting
11251126
public void setIgnoreDeleteForTesting(boolean b) {
11261127
ignoreZKDeleteForTesting = b;
11271128
}

hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import java.io.IOException;
2121

22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.CoordinatedStateManager;
2224
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
2325
import org.apache.hadoop.hbase.Server;
26+
import org.apache.hadoop.hbase.ServerName;
2427
import org.apache.yetus.audience.InterfaceAudience;
2528
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
2629
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
@@ -33,24 +36,16 @@
3336
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
3437
*/
3538
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
36-
public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
37-
protected Server server;
39+
public class ZkCoordinatedStateManager implements CoordinatedStateManager {
3840
protected ZooKeeperWatcher watcher;
3941
protected SplitLogWorkerCoordination splitLogWorkerCoordination;
4042
protected SplitLogManagerCoordination splitLogManagerCoordination;
4143

42-
@Override
43-
public void initialize(Server server) {
44-
this.server = server;
44+
public ZkCoordinatedStateManager(Server server) {
4545
this.watcher = server.getZooKeeper();
46-
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher);
47-
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher);
48-
49-
}
50-
51-
@Override
52-
public Server getServer() {
53-
return server;
46+
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(server.getServerName(), watcher);
47+
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(server.getConfiguration(),
48+
watcher);
5449
}
5550

5651
@Override

0 commit comments

Comments
 (0)