Skip to content

Commit

Permalink
HBASE-19904 Break dependency of WAL constructor on Replication
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Feb 2, 2018
1 parent a2bc19a commit fc6d140
Show file tree
Hide file tree
Showing 51 changed files with 465 additions and 474 deletions.
Expand Up @@ -127,7 +127,7 @@ public static void tearDownAfterClass() throws Exception {
*/ */
@Test @Test
public void testPartialRead() throws Exception { public void testPartialRead() throws Exception {
final WALFactory walfactory = new WALFactory(conf, null, getName()); final WALFactory walfactory = new WALFactory(conf, getName());
WAL log = walfactory.getWAL(info); WAL log = walfactory.getWAL(info);
// This test depends on timestamp being millisecond based and the filename of the WAL also // This test depends on timestamp being millisecond based and the filename of the WAL also
// being millisecond based. // being millisecond based.
Expand Down Expand Up @@ -186,7 +186,7 @@ public void testPartialRead() throws Exception {
*/ */
@Test @Test
public void testWALRecordReader() throws Exception { public void testWALRecordReader() throws Exception {
final WALFactory walfactory = new WALFactory(conf, null, getName()); final WALFactory walfactory = new WALFactory(conf, getName());
WAL log = walfactory.getWAL(info); WAL log = walfactory.getWAL(info);
byte [] value = Bytes.toBytes("value"); byte [] value = Bytes.toBytes("value");
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -122,4 +123,13 @@ public static boolean isKeyConfigEqual(ReplicationPeerConfig rpc1, ReplicationPe
isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap()); isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
} }
} }

/**
* @param c Configuration to look at
* @return True if replication for bulk load data is enabled.
*/
public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
} }
Expand Up @@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;


import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;

import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors;
import com.google.protobuf.Service; import com.google.protobuf.Service;
import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -166,8 +168,10 @@
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
Expand Down Expand Up @@ -484,7 +488,7 @@ public HMaster(final Configuration conf)
// Disable usage of meta replicas in the master // Disable usage of meta replicas in the master
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);


Replication.decorateMasterConfiguration(this.conf); decorateMasterConfiguration(this.conf);


// Hack! Maps DFSClient => Master for logs. HDFS made this // Hack! Maps DFSClient => Master for logs. HDFS made this
// config param for task trackers, but we can piggyback off of it. // config param for task trackers, but we can piggyback off of it.
Expand Down Expand Up @@ -3557,4 +3561,23 @@ public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
public ReplicationPeerManager getReplicationPeerManager() { public ReplicationPeerManager getReplicationPeerManager() {
return replicationPeerManager; return replicationPeerManager;
} }

/**
* This method modifies the master's configuration in order to inject replication-related features
*/
@VisibleForTesting
public static void decorateMasterConfiguration(Configuration conf) {
String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
if (!plugins.contains(cleanerClass)) {
conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
}
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
if (!plugins.contains(cleanerClass)) {
conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
}
}
}
} }
Expand Up @@ -20,6 +20,8 @@
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
Expand Down Expand Up @@ -69,7 +71,6 @@
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -148,28 +149,6 @@
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.trace.TraceUtil;
Expand Down Expand Up @@ -200,7 +179,29 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import edu.umd.cs.findbugs.annotations.Nullable; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;


/** /**
* Regions store data for a certain region of a table. It stores all columns * Regions store data for a certain region of a table. It stores all columns
Expand Down
Expand Up @@ -131,10 +131,9 @@
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
Expand All @@ -158,6 +157,7 @@
import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper; import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
Expand Down Expand Up @@ -546,7 +546,7 @@ public HRegionServer(Configuration conf) throws IOException {
checkCodecs(this.conf); checkCodecs(this.conf);
this.userProvider = UserProvider.instantiate(conf); this.userProvider = UserProvider.instantiate(conf);
FSUtils.setupShortCircuitRead(this.conf); FSUtils.setupShortCircuitRead(this.conf);
Replication.decorateRegionServerConfiguration(this.conf); decorateRegionServerConfiguration(this.conf);


// Disable usage of meta replicas in the regionserver // Disable usage of meta replicas in the regionserver
this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
Expand Down Expand Up @@ -1781,52 +1781,26 @@ public boolean isOnline() {
} }


/** /**
* Setup WAL log and replication if enabled. * Setup WAL log and replication if enabled. Replication setup is done in here because it wants to
* Replication setup is done in here because it wants to be hooked up to WAL. * be hooked up to WAL.
*
* @throws IOException
*/ */
private void setupWALAndReplication() throws IOException { private void setupWALAndReplication() throws IOException {
WALFactory factory = new WALFactory(conf, serverName.toString());

// TODO Replication make assumptions here based on the default filesystem impl // TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());


Path logDir = new Path(walRootDir, logName); Path logDir = new Path(walRootDir, logName);
if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); LOG.debug("logDir={}", logDir);
if (this.walFs.exists(logDir)) { if (this.walFs.exists(logDir)) {
throw new RegionServerRunningException("Region server has already " + throw new RegionServerRunningException(
"created directory at " + this.serverName.toString()); "Region server has already created directory at " + this.serverName.toString());
} }

// Instantiate replication if replication enabled. Pass it the log directories.
// Instantiate replication if replication enabled. Pass it the log directories. createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
// In here we create the Replication instances. Later they are initialized and started up. factory.getWALProvider());
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);

// listeners the wal factory will add to wals it creates.
List<WALActionsListener> listeners = new ArrayList<>();
listeners.add(new MetricsWAL());
if (this.replicationSourceHandler != null &&
this.replicationSourceHandler.getWALActionsListener() != null) {
// Replication handler is an implementation of WALActionsListener.
listeners.add(this.replicationSourceHandler.getWALActionsListener());
}

// There is a cyclic dependency between ReplicationSourceHandler and WALFactory.
// We use WALActionsListener to get the newly rolled WALs, so we need to get the
// WALActionsListeners from ReplicationSourceHandler before constructing WALFactory. And then
// ReplicationSourceHandler need to use WALFactory get the length of the wal file being written.
// So we here we need to construct WALFactory first, and then pass it to the initialized method
// of ReplicationSourceHandler.
// TODO: I can't follow replication; it has initialize and then later on we start it!
WALFactory factory = new WALFactory(conf, listeners, serverName.toString());
this.walFactory = factory; this.walFactory = factory;
if (this.replicationSourceHandler != null) {
this.replicationSourceHandler.initialize(this, walFs, logDir, oldLogDir, factory);
}
if (this.replicationSinkHandler != null &&
this.replicationSinkHandler != this.replicationSourceHandler) {
this.replicationSinkHandler.initialize(this, walFs, logDir, oldLogDir, factory);
}
} }


/** /**
Expand Down Expand Up @@ -2918,15 +2892,13 @@ public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
// //
// Main program and support routines // Main program and support routines
// //

/** /**
* Load the replication executorService objects, if any * Load the replication executorService objects, if any
*/ */
private static void createNewReplicationInstance(Configuration conf, HRegionServer server, private static void createNewReplicationInstance(Configuration conf, HRegionServer server,
FileSystem walFs, Path walDir, Path oldWALDir) throws IOException { FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) throws IOException {

if ((server instanceof HMaster) &&
if ((server instanceof HMaster) && (!LoadBalancer.isTablesOnMaster(conf) || (!LoadBalancer.isTablesOnMaster(conf) || LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
LoadBalancer.isSystemTablesOnlyOnMaster(conf))) {
return; return;
} }


Expand All @@ -2941,32 +2913,30 @@ private static void createNewReplicationInstance(Configuration conf, HRegionServ
// If both the sink and the source class names are the same, then instantiate // If both the sink and the source class names are the same, then instantiate
// only one object. // only one object.
if (sourceClassname.equals(sinkClassname)) { if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = server.replicationSourceHandler = newReplicationInstance(sourceClassname,
(ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
walDir, oldWALDir);
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
} else { } else {
server.replicationSourceHandler = server.replicationSourceHandler = newReplicationInstance(sourceClassname,
(ReplicationSourceService) newReplicationInstance(sourceClassname, conf, server, walFs, ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
walDir, oldWALDir); server.replicationSinkHandler = newReplicationInstance(sinkClassname,
server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname, ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
conf, server, walFs, walDir, oldWALDir);
} }
} }


private static ReplicationService newReplicationInstance(String classname, Configuration conf, private static <T extends ReplicationService> T newReplicationInstance(String classname,
HRegionServer server, FileSystem walFs, Path logDir, Path oldLogDir) throws IOException { Class<T> xface, Configuration conf, HRegionServer server, FileSystem walFs, Path logDir,
Class<? extends ReplicationService> clazz = null; Path oldLogDir, WALProvider walProvider) throws IOException {
Class<? extends T> clazz = null;
try { try {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
clazz = Class.forName(classname, true, classLoader).asSubclass(ReplicationService.class); clazz = Class.forName(classname, true, classLoader).asSubclass(xface);
} catch (java.lang.ClassNotFoundException nfe) { } catch (java.lang.ClassNotFoundException nfe) {
throw new IOException("Could not find class for " + classname); throw new IOException("Could not find class for " + classname);
} }

T service = ReflectionUtils.newInstance(clazz, conf);
// create an instance of the replication object, but do not initialize it here as we need to use service.initialize(server, walFs, logDir, oldLogDir, walProvider);
// WALFactory when initializing. return service;
return ReflectionUtils.newInstance(clazz, conf);
} }


/** /**
Expand Down Expand Up @@ -3739,4 +3709,20 @@ void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException
throw ProtobufUtil.getRemoteException(se); throw ProtobufUtil.getRemoteException(se);
} }
} }

/**
* This method modifies the region server's configuration in order to inject replication-related
* features
* @param conf region server configurations
*/
static void decorateRegionServerConfiguration(Configuration conf) {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
if (!plugins.contains(rsCoprocessorClass)) {
conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
plugins + "," + rsCoprocessorClass);
}
}
}
} }
@@ -1,5 +1,4 @@
/* /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
Expand All @@ -19,13 +18,12 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;


import java.io.IOException; import java.io.IOException;

import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;


/** /**
* Gateway to Cluster Replication. Used by * Gateway to Cluster Replication. Used by
Expand All @@ -37,14 +35,14 @@ public interface ReplicationService {


/** /**
* Initializes the replication service object. * Initializes the replication service object.
* @throws IOException * @param walProvider can be null if not initialized inside a live region server environment, for
* example, {@code ReplicationSyncUp}.
*/ */
void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider)
WALFileLengthProvider walFileLengthProvider) throws IOException; throws IOException;


/** /**
* Start replication services. * Start replication services.
* @throws IOException
*/ */
void startReplicationService() throws IOException; void startReplicationService() throws IOException;


Expand Down

0 comments on commit fc6d140

Please sign in to comment.