Skip to content

Commit

Permalink
HBASE-19301 Provide way for CPs to create short circuited connection …
Browse files Browse the repository at this point in the history
…with custom configurations.
  • Loading branch information
anoopsjohn committed Nov 21, 2017
1 parent 48cf4c7 commit 984e0ec
Show file tree
Hide file tree
Showing 27 changed files with 226 additions and 38 deletions.
Expand Up @@ -127,6 +127,7 @@ public static void setServerSideHConnectionRetriesConfig(final Configuration c,
* localhost if the invocation target is 'this' server; save on network and protobuf * localhost if the invocation target is 'this' server; save on network and protobuf
* invocations. * invocations.
*/ */
// TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
@VisibleForTesting // Class is visible so can assert we are short-circuiting when expected. @VisibleForTesting // Class is visible so can assert we are short-circuiting when expected.
public static class ShortCircuitingClusterConnection extends ConnectionImplementation { public static class ShortCircuitingClusterConnection extends ConnectionImplementation {
private final ServerName serverName; private final ServerName serverName;
Expand Down
Expand Up @@ -53,6 +53,8 @@ public interface Server extends Abortable, Stoppable {
*/ */
Connection getConnection(); Connection getConnection();


Connection createConnection(Configuration conf) throws IOException;

/** /**
* Returns a reference to the servers' cluster connection. Prefer {@link #getConnection()}. * Returns a reference to the servers' cluster connection. Prefer {@link #getConnection()}.
* *
Expand Down
Expand Up @@ -19,13 +19,16 @@


package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.metrics.MetricRegistry;


@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving @InterfaceStability.Evolving
Expand All @@ -41,15 +44,31 @@ public interface MasterCoprocessorEnvironment extends CoprocessorEnvironment<Mas
* struggling or it is on the other side of a network partition. Any use of Connection from * struggling or it is on the other side of a network partition. Any use of Connection from
* inside a Coprocessor must be able to handle all such hiccups. * inside a Coprocessor must be able to handle all such hiccups.
* *
* <p>Using a Connection to get at a local resource -- say a Region that is on the local * <p>Using this Connection to get at a local resource -- say a Region that is on the local
* Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a * Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a
* short-circuit of the RPC framework to make a direct invocation avoiding RPC (and * short-circuit of the RPC framework to make a direct invocation avoiding RPC.
* protobuf marshalling/unmarshalling). * <p>
* * Note: If you want to create Connection with your own Configuration and NOT use the Master's
* Connection (though its cache of locations will be warm, and its life-cycle is not the concern
* of the CP), see {@link #createConnection(Configuration)}.
* @return The host's Connection to the Cluster. * @return The host's Connection to the Cluster.
*/ */
Connection getConnection(); Connection getConnection();


/**
* Creates a cluster connection using the passed configuration.
* <p>Using this Connection to get at a local resource -- say a Region that is on the local
* Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a
* short-circuit of the RPC framework to make a direct invocation avoiding RPC.
* <p>
* Note: HBase will NOT cache/maintain this Connection. If Coprocessors need to cache and reuse
* this connection, it has to be done by Coprocessors. Also make sure to close it after use.
*
* @param conf configuration
* @return Connection created using the passed conf.
*/
Connection createConnection(Configuration conf) throws IOException;

/** /**
* Returns a MetricRegistry that can be used to track metrics at the master level. * Returns a MetricRegistry that can be used to track metrics at the master level.
* *
Expand Down
Expand Up @@ -19,8 +19,10 @@


package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;


import java.io.IOException;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -62,13 +64,29 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
* *
* <p>Using a Connection to get at a local resource -- say a Region that is on the local * <p>Using a Connection to get at a local resource -- say a Region that is on the local
* Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a * Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a
* short-circuit of the RPC framework to make a direct invocation avoiding RPC (and * short-circuit of the RPC framework to make a direct invocation avoiding RPC.
* protobuf marshalling/unmarshalling). *<p>
* * Note: If you want to create Connection with your own Configuration and NOT use the RegionServer
* Connection (though its cache of locations will be warm, and its life-cycle is not the concern
* of the CP), see {@link #createConnection(Configuration)}.
* @return The host's Connection to the Cluster. * @return The host's Connection to the Cluster.
*/ */
Connection getConnection(); Connection getConnection();


/**
* Creates a cluster connection using the passed configuration.
* <p>Using this Connection to get at a local resource -- say a Region that is on the local
* Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a
* short-circuit of the RPC framework to make a direct invocation avoiding RPC.
* <p>
* Note: HBase will NOT cache/maintain this Connection. If Coprocessors need to cache and reuse
* this connection, it has to be done by Coprocessors. Also make sure to close it after use.
*
* @param conf configuration
* @return Connection created using the passed conf.
*/
Connection createConnection(Configuration conf) throws IOException;

/** /**
* Returns a MetricRegistry that can be used to track metrics at the region server level. All * Returns a MetricRegistry that can be used to track metrics at the region server level. All
* metrics tracked at this level will be shared by all the coprocessor instances * metrics tracked at this level will be shared by all the coprocessor instances
Expand Down
Expand Up @@ -18,6 +18,9 @@
*/ */
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -49,13 +52,29 @@ public interface RegionServerCoprocessorEnvironment
* *
* <p>Using a Connection to get at a local resource -- say a Region that is on the local * <p>Using a Connection to get at a local resource -- say a Region that is on the local
* Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a * Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a
* short-circuit of the RPC framework to make a direct invocation avoiding RPC (and * short-circuit of the RPC framework to make a direct invocation avoiding RPC.
* protobuf marshalling/unmarshalling). *<p>
* * Note: If you want to create Connection with your own Configuration and NOT use the RegionServer
* Connection (though its cache of locations will be warm, and its life-cycle is not the concern
* of the CP), see {@link #createConnection(Configuration)}.
* @return The host's Connection to the Cluster. * @return The host's Connection to the Cluster.
*/ */
Connection getConnection(); Connection getConnection();


/**
* Creates a cluster connection using the passed configuration.
* <p>Using this Connection to get at a local resource -- say a Region that is on the local
* Server or using Admin Interface from a Coprocessor hosted on the Master -- will result in a
* short-circuit of the RPC framework to make a direct invocation avoiding RPC.
* <p>
* Note: HBase will NOT cache/maintain this Connection. If Coprocessors need to cache and reuse
* this connection, it has to be done by Coprocessors. Also make sure to close it after use.
*
* @param conf configuration
* @return Connection created using the passed conf.
*/
Connection createConnection(Configuration conf) throws IOException;

/** /**
* Returns a MetricRegistry that can be used to track metrics at the region server level. * Returns a MetricRegistry that can be used to track metrics at the region server level.
* *
Expand Down
Expand Up @@ -78,16 +78,14 @@ public class MasterCoprocessorHost
*/ */
private static class MasterEnvironment extends BaseEnvironment<MasterCoprocessor> private static class MasterEnvironment extends BaseEnvironment<MasterCoprocessor>
implements MasterCoprocessorEnvironment { implements MasterCoprocessorEnvironment {
private final Connection connection;
private final ServerName serverName;
private final boolean supportGroupCPs; private final boolean supportGroupCPs;
private final MetricRegistry metricRegistry; private final MetricRegistry metricRegistry;
private final MasterServices services;


public MasterEnvironment(final MasterCoprocessor impl, final int priority, final int seq, public MasterEnvironment(final MasterCoprocessor impl, final int priority, final int seq,
final Configuration conf, final MasterServices services) { final Configuration conf, final MasterServices services) {
super(impl, priority, seq, conf); super(impl, priority, seq, conf);
this.connection = services.getConnection(); this.services = services;
this.serverName = services.getServerName();
supportGroupCPs = !useLegacyMethod(impl.getClass(), supportGroupCPs = !useLegacyMethod(impl.getClass(),
"preBalanceRSGroup", ObserverContext.class, String.class); "preBalanceRSGroup", ObserverContext.class, String.class);
this.metricRegistry = this.metricRegistry =
Expand All @@ -96,12 +94,17 @@ public MasterEnvironment(final MasterCoprocessor impl, final int priority, final


@Override @Override
public ServerName getServerName() { public ServerName getServerName() {
return this.serverName; return this.services.getServerName();
} }


@Override @Override
public Connection getConnection() { public Connection getConnection() {
return this.connection; return this.services.getConnection();
}

@Override
public Connection createConnection(Configuration conf) throws IOException {
return this.services.createConnection(conf);
} }


@Override @Override
Expand Down
Expand Up @@ -3715,4 +3715,11 @@ public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
public NettyEventLoopGroupConfig getEventLoopGroupConfig() { public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
return eventLoopGroupConfig; return eventLoopGroupConfig;
} }

@Override
public Connection createConnection(Configuration conf) throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName,
this.rpcServices, this.rpcServices);
}
} }
Expand Up @@ -114,9 +114,7 @@ private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor
private Region region; private Region region;
ConcurrentMap<String, Object> sharedData; ConcurrentMap<String, Object> sharedData;
private final MetricRegistry metricRegistry; private final MetricRegistry metricRegistry;
private final Connection connection; private final RegionServerServices services;
private final ServerName serverName;
private final OnlineRegions onlineRegions;


/** /**
* Constructor * Constructor
Expand All @@ -128,11 +126,8 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority,
final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) { final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
super(impl, priority, seq, conf); super(impl, priority, seq, conf);
this.region = region; this.region = region;
// Mocks may have services as null at test time.
this.connection = services != null? services.getConnection(): null;
this.serverName = services != null? services.getServerName(): null;
this.sharedData = sharedData; this.sharedData = sharedData;
this.onlineRegions = services; this.services = services;
this.metricRegistry = this.metricRegistry =
MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
} }
Expand All @@ -144,17 +139,23 @@ public Region getRegion() {
} }


public OnlineRegions getOnlineRegions() { public OnlineRegions getOnlineRegions() {
return this.onlineRegions; return this.services;
} }


@Override @Override
public Connection getConnection() { public Connection getConnection() {
return this.connection; // Mocks may have services as null at test time.
return services != null ? services.getConnection() : null;
}

@Override
public Connection createConnection(Configuration conf) throws IOException {
return services != null ? this.services.createConnection(conf) : null;
} }


@Override @Override
public ServerName getServerName() { public ServerName getServerName() {
return this.serverName; return services != null? services.getServerName(): null;
} }


@Override @Override
Expand Down
Expand Up @@ -210,9 +210,7 @@ public void call(RegionServerObserver observer) throws IOException {
private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor> private static class RegionServerEnvironment extends BaseEnvironment<RegionServerCoprocessor>
implements RegionServerCoprocessorEnvironment { implements RegionServerCoprocessorEnvironment {
private final MetricRegistry metricRegistry; private final MetricRegistry metricRegistry;
private final Connection connection; private final RegionServerServices services;
private final ServerName serverName;
private final OnlineRegions onlineRegions;


@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
justification="Intentional; FB has trouble detecting isAssignableFrom") justification="Intentional; FB has trouble detecting isAssignableFrom")
Expand All @@ -223,26 +221,29 @@ public RegionServerEnvironment(final RegionServerCoprocessor impl, final int pri
for (Service service : impl.getServices()) { for (Service service : impl.getServices()) {
services.registerService(service); services.registerService(service);
} }
this.onlineRegions = services; this.services = services;
this.connection = services.getConnection();
this.serverName = services.getServerName();
this.metricRegistry = this.metricRegistry =
MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName()); MetricsCoprocessor.createRegistryForRSCoprocessor(impl.getClass().getName());
} }


@Override @Override
public OnlineRegions getOnlineRegions() { public OnlineRegions getOnlineRegions() {
return this.onlineRegions; return this.services;
} }


@Override @Override
public ServerName getServerName() { public ServerName getServerName() {
return this.serverName; return this.services.getServerName();
} }


@Override @Override
public Connection getConnection() { public Connection getConnection() {
return this.connection; return this.services.getConnection();
}

@Override
public Connection createConnection(Configuration conf) throws IOException {
return this.services.createConnection(conf);
} }


@Override @Override
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;


import java.io.IOException;

import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand All @@ -31,6 +33,7 @@
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand Down Expand Up @@ -203,5 +206,10 @@ public FileSystem getFileSystem() {
public boolean isStopping() { public boolean isStopping() {
return false; return false;
} }

@Override
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
} }
} }
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
Expand Down Expand Up @@ -329,4 +330,9 @@ public void unassign(byte[] regionName) throws IOException {
public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
return null; return null;
} }

@Override
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
} }
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
Expand Down Expand Up @@ -460,4 +461,9 @@ public ProcedureEvent getInitializedEvent() {
public FileSystem getFileSystem() { public FileSystem getFileSystem() {
return null; return null;
} }

@Override
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
} }
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
Expand Down Expand Up @@ -672,4 +673,9 @@ public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
throws ServiceException { throws ServiceException {
return null; return null;
} }

@Override
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
} }

0 comments on commit 984e0ec

Please sign in to comment.