Skip to content

Commit

Permalink
HBASE-19073 Cleanup CoordinatedStateManager
Browse files Browse the repository at this point in the history
- Merged BaseCSM class into CSM interface
- Removed config hbase.coordinated.state.manager.class
- Since state manager is not pluggable anymore, we don't need start/stop/initialize to setup unknown classes. Our internal ZkCSM now requires Server in constructor itself. Makes the dependency clearer too.
- Removed CSM from HRegionServer and HMaster constructor. Although it's a step back from dependency injection, but it's more consistent with our current (not good)  pattern where we initialize everything in the ctor itself.

Change-Id: Ifca06bb354adec5b11ea1bad4707e014410491fc
  • Loading branch information
apeksharma committed Oct 25, 2017
1 parent eee3b01 commit dd70cc3
Show file tree
Hide file tree
Showing 46 changed files with 158 additions and 405 deletions.
Expand Up @@ -26,12 +26,12 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
Expand Down Expand Up @@ -95,10 +95,7 @@ public void initialize(MasterServices master, MetricsMaster metricsMaster)


// setup the default procedure coordinator // setup the default procedure coordinator
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
BaseCoordinatedStateManager coordManager = CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(master);
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory
.getCoordinatedStateManager(master.getConfiguration());
coordManager.initialize(master);
ProcedureCoordinatorRpcs comms = ProcedureCoordinatorRpcs comms =
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
Expand Down
Expand Up @@ -24,12 +24,12 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
Expand Down Expand Up @@ -159,12 +159,8 @@ public void initialize(RegionServerServices rss) throws KeeperException {
+ " setting"); + " setting");
return; return;
} }
BaseCoordinatedStateManager coordManager = CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(rss);
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory. this.memberRpcs = coordManager
getCoordinatedStateManager(rss.getConfiguration());
coordManager.initialize(rss);
this.memberRpcs =
coordManager
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);


// read in the backup handler configuration properties // read in the backup handler configuration properties
Expand Down
Expand Up @@ -1209,10 +1209,6 @@ public enum OperationStatusCode {


public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10; public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;


/** Config for pluggable consensus provider */
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";

/** Configuration key for SplitLog manager timeout */ /** Configuration key for SplitLog manager timeout */
public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout"; public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout";


Expand Down Expand Up @@ -1298,7 +1294,7 @@ public enum OperationStatusCode {


public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY = public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
"hbase.canary.write.table.check.period"; "hbase.canary.write.table.check.period";

public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = "hbase.canary.read.raw.enabled"; public static final String HBASE_CANARY_READ_RAW_SCAN_KEY = "hbase.canary.read.raw.enabled";


/** /**
Expand Down
Expand Up @@ -17,7 +17,14 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;


import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;

import java.io.IOException;


/** /**
* Implementations of this interface will keep and return to clients * Implementations of this interface will keep and return to clients
Expand All @@ -28,31 +35,27 @@
* For each coarse-grained area of operations there will be a separate * For each coarse-grained area of operations there will be a separate
* interface with implementation, providing API for relevant operations * interface with implementation, providing API for relevant operations
* requiring coordination. * requiring coordination.
*
* Property hbase.coordinated.state.manager.class in hbase-site.xml controls
* which provider to use.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface CoordinatedStateManager { public interface CoordinatedStateManager {

/** /**
* Initialize coordinated state management service. * Method to retrieve coordination for split log worker
* @param server server instance to run within.
*/ */
void initialize(Server server); SplitLogWorkerCoordination getSplitLogWorkerCoordination();


/** /**
* Starts service. * Method to retrieve coordination for split log manager
*/ */
void start(); SplitLogManagerCoordination getSplitLogManagerCoordination();

/** /**
* Stops service. * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
*/ */
void stop(); ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
throws IOException;


/** /**
* @return instance of Server coordinated state manager runs within * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
*/ */
Server getServer(); ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException;

} }

This file was deleted.

Expand Up @@ -173,14 +173,8 @@ public JVMClusterUtil.RegionServerThread addRegionServer(
// Create each regionserver with its own Configuration instance so each has // Create each regionserver with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in // its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager). // the guts of ConnectionManager).

// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);

JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.RegionServerThread rst =
JVMClusterUtil.createRegionServerThread(config, cp, (Class<? extends HRegionServer>) conf JVMClusterUtil.createRegionServerThread(config, (Class<? extends HRegionServer>) conf
.getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index); .getClass(HConstants.REGION_SERVER_IMPL, this.regionServerClass), index);


this.regionThreads.add(rst); this.regionThreads.add(rst);
Expand Down Expand Up @@ -208,13 +202,7 @@ public JVMClusterUtil.MasterThread addMaster(Configuration c, final int index)
// Create each master with its own Configuration instance so each has // Create each master with its own Configuration instance so each has
// its Connection instance rather than share (see HBASE_INSTANCES down in // its Connection instance rather than share (see HBASE_INSTANCES down in
// the guts of ConnectionManager. // the guts of ConnectionManager.

JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c,
// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);

JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index); (Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);
this.masterThreads.add(mt); this.masterThreads.add(mt);
return mt; return mt;
Expand Down

This file was deleted.

Expand Up @@ -58,7 +58,7 @@ public interface SplitLogManagerCoordination {
/** /**
* Detail class that shares data between coordination and split log manager * Detail class that shares data between coordination and split log manager
*/ */
public static class SplitLogManagerDetails { class SplitLogManagerDetails {
final private ConcurrentMap<String, Task> tasks; final private ConcurrentMap<String, Task> tasks;
final private MasterServices master; final private MasterServices master;
final private Set<String> failedDeletions; final private Set<String> failedDeletions;
Expand Down Expand Up @@ -156,7 +156,7 @@ void markRegionsRecovering(final ServerName serverName, Set<RegionInfo> userRegi
* @throws InterruptedIOException * @throws InterruptedIOException
* @throws IOException in case of failure * @throws IOException in case of failure
*/ */
void setRecoveryMode(boolean b) throws InterruptedIOException, IOException; void setRecoveryMode(boolean b) throws IOException;


/** /**
* Removes known stale servers * Removes known stale servers
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter;
Expand Down Expand Up @@ -101,22 +102,21 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements


private boolean isDrainingDone = false; private boolean isDrainingDone = false;


public ZKSplitLogManagerCoordination(final CoordinatedStateManager manager, public ZKSplitLogManagerCoordination(Configuration conf, ZooKeeperWatcher watcher) {
ZooKeeperWatcher watcher) {
super(watcher); super(watcher);
this.conf = conf;
taskFinisher = new TaskFinisher() { taskFinisher = new TaskFinisher() {
@Override @Override
public Status finish(ServerName workerName, String logfile) { public Status finish(ServerName workerName, String logfile) {
try { try {
WALSplitter.finishSplitLogFile(logfile, manager.getServer().getConfiguration()); WALSplitter.finishSplitLogFile(logfile, conf);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Could not finish splitting of log file " + logfile, e); LOG.warn("Could not finish splitting of log file " + logfile, e);
return Status.ERR; return Status.ERR;
} }
return Status.DONE; return Status.DONE;
} }
}; };
this.conf = manager.getServer().getConfiguration();
} }


@Override @Override
Expand Down Expand Up @@ -1122,6 +1122,7 @@ public long getLastRecoveryTime() {
/** /**
* Temporary function that is used by unit tests only * Temporary function that is used by unit tests only
*/ */
@VisibleForTesting
public void setIgnoreDeleteForTesting(boolean b) { public void setIgnoreDeleteForTesting(boolean b) {
ignoreZKDeleteForTesting = b; ignoreZKDeleteForTesting = b;
} }
Expand Down
Expand Up @@ -19,8 +19,11 @@


import java.io.IOException; import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
Expand All @@ -33,24 +36,16 @@
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}. * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { public class ZkCoordinatedStateManager implements CoordinatedStateManager {
protected Server server;
protected ZooKeeperWatcher watcher; protected ZooKeeperWatcher watcher;
protected SplitLogWorkerCoordination splitLogWorkerCoordination; protected SplitLogWorkerCoordination splitLogWorkerCoordination;
protected SplitLogManagerCoordination splitLogManagerCoordination; protected SplitLogManagerCoordination splitLogManagerCoordination;


@Override public ZkCoordinatedStateManager(Server server) {
public void initialize(Server server) {
this.server = server;
this.watcher = server.getZooKeeper(); this.watcher = server.getZooKeeper();
splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(this, watcher); splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(server.getServerName(), watcher);
splitLogManagerCoordination = new ZKSplitLogManagerCoordination(this, watcher); splitLogManagerCoordination = new ZKSplitLogManagerCoordination(server.getConfiguration(),

watcher);
}

@Override
public Server getServer() {
return server;
} }


@Override @Override
Expand Down

0 comments on commit dd70cc3

Please sign in to comment.