Skip to content

Commit

Permalink
PHOENIX-4317 Update RPC controller to use the updated APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsinghal committed Nov 8, 2017
1 parent b989e55 commit 136c7a6
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 43 deletions.
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException; import java.io.IOException;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.query.QueryServicesOptions;


Expand All @@ -42,22 +43,20 @@ public class PhoenixRpcScheduler extends RpcScheduler {
private RpcExecutor indexCallExecutor; private RpcExecutor indexCallExecutor;
private RpcExecutor metadataCallExecutor; private RpcExecutor metadataCallExecutor;
private int port; private int port;



public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) { public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, PriorityFunction priorityFunction, Abortable abortable) {
// copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT); int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT); int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
int numIndexQueues = Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor));
int numMetadataQueues = Math.max(1, Math.round(metadataHandlerCount * callQueuesHandlersFactor));


this.indexPriority = indexPriority; this.indexPriority = indexPriority;
this.metadataPriority = metadataPriority; this.metadataPriority = metadataPriority;
this.delegate = delegate; this.delegate = delegate;
this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, numIndexQueues, maxIndexQueueLength); this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable);
this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, numMetadataQueues, maxMetadataQueueLength); this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable);
} }


@Override @Override
Expand All @@ -82,7 +81,7 @@ public void stop() {


@Override @Override
public boolean dispatch(CallRunner callTask) throws InterruptedException, IOException { public boolean dispatch(CallRunner callTask) throws InterruptedException, IOException {
RpcServer.Call call = callTask.getCall(); ServerCall call = callTask.getCall();
int priority = call.header.getPriority(); int priority = call.header.getPriority();
if (indexPriority == priority) { if (indexPriority == priority) {
return indexCallExecutor.dispatch(callTask); return indexCallExecutor.dispatch(callTask);
Expand Down Expand Up @@ -134,6 +133,36 @@ public void setIndexExecutorForTesting(RpcExecutor executor) {
public void setMetadataExecutorForTesting(RpcExecutor executor) { public void setMetadataExecutorForTesting(RpcExecutor executor) {
this.metadataCallExecutor = executor; this.metadataCallExecutor = executor;
} }

@Override
public int getWriteQueueLength() {
return delegate.getWriteQueueLength();
}

@Override
public int getReadQueueLength() {
return delegate.getReadQueueLength();
}

@Override
public int getScanQueueLength() {
return delegate.getScanQueueLength();
}

@Override
public int getActiveWriteRpcHandlerCount() {
return delegate.getActiveWriteRpcHandlerCount();
}

@Override
public int getActiveReadRpcHandlerCount() {
return delegate.getActiveReadRpcHandlerCount();
}

@Override
public int getActiveScanRpcHandlerCount() {
return delegate.getActiveScanRpcHandlerCount();
}




} }
Expand Up @@ -22,9 +22,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory; import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServices;
Expand Down Expand Up @@ -67,7 +64,7 @@ public RpcScheduler create(Configuration conf, PriorityFunction priorityFunction
LOG.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority); LOG.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority);


PhoenixRpcScheduler scheduler = PhoenixRpcScheduler scheduler =
new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority); new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, priorityFunction,abortable);
return scheduler; return scheduler;
} }


Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;


/** /**
Expand All @@ -36,24 +36,24 @@ public ClientRpcControllerFactory(Configuration conf) {
} }


@Override @Override
public PayloadCarryingRpcController newController() { public HBaseRpcController newController() {
PayloadCarryingRpcController delegate = super.newController(); HBaseRpcController delegate = super.newController();
return getController(delegate); return getController(delegate);
} }


@Override @Override
public PayloadCarryingRpcController newController(CellScanner cellScanner) { public HBaseRpcController newController(CellScanner cellScanner) {
PayloadCarryingRpcController delegate = super.newController(cellScanner); HBaseRpcController delegate = super.newController(cellScanner);
return getController(delegate); return getController(delegate);
} }


@Override @Override
public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) { public HBaseRpcController newController(List<CellScannable> cellIterables) {
PayloadCarryingRpcController delegate = super.newController(cellIterables); HBaseRpcController delegate = super.newController(cellIterables);
return getController(delegate); return getController(delegate);
} }


private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) { private HBaseRpcController getController(HBaseRpcController delegate) {
return new MetadataRpcController(delegate, conf); return new MetadataRpcController(delegate, conf);
} }


Expand Down
Expand Up @@ -19,8 +19,8 @@


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.query.QueryServicesOptions;
Expand All @@ -31,12 +31,12 @@
* {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix index * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix index
* tables. * tables.
*/ */
class IndexRpcController extends DelegatingPayloadCarryingRpcController { class IndexRpcController extends DelegatingHBaseRpcController {


private final int priority; private final int priority;
private final String tracingTableName; private final String tracingTableName;


public IndexRpcController(PayloadCarryingRpcController delegate, Configuration conf) { public IndexRpcController(HBaseRpcController delegate, Configuration conf) {
super(delegate); super(delegate);
this.priority = PhoenixRpcSchedulerFactory.getIndexPriority(conf); this.priority = PhoenixRpcSchedulerFactory.getIndexPriority(conf);
this.tracingTableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB, this.tracingTableName = conf.get(QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB,
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;


/** /**
Expand All @@ -39,24 +39,24 @@ public InterRegionServerIndexRpcControllerFactory(Configuration conf) {
} }


@Override @Override
public PayloadCarryingRpcController newController() { public HBaseRpcController newController() {
PayloadCarryingRpcController delegate = super.newController(); HBaseRpcController delegate = super.newController();
return getController(delegate); return getController(delegate);
} }


@Override @Override
public PayloadCarryingRpcController newController(CellScanner cellScanner) { public HBaseRpcController newController(CellScanner cellScanner) {
PayloadCarryingRpcController delegate = super.newController(cellScanner); HBaseRpcController delegate = super.newController(cellScanner);
return getController(delegate); return getController(delegate);
} }


@Override @Override
public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) { public HBaseRpcController newController(List<CellScannable> cellIterables) {
PayloadCarryingRpcController delegate = super.newController(cellIterables); HBaseRpcController delegate = super.newController(cellIterables);
return getController(delegate); return getController(delegate);
} }


private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) { private HBaseRpcController getController(HBaseRpcController delegate) {
// construct a chain of controllers: metadata, index and standard controller // construct a chain of controllers: metadata, index and standard controller
IndexRpcController indexRpcController = new IndexRpcController(delegate, conf); IndexRpcController indexRpcController = new IndexRpcController(delegate, conf);
return new MetadataRpcController(indexRpcController, conf); return new MetadataRpcController(indexRpcController, conf);
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;


/** /**
Expand All @@ -37,24 +37,24 @@ public InterRegionServerMetadataRpcControllerFactory(Configuration conf) {
} }


@Override @Override
public PayloadCarryingRpcController newController() { public HBaseRpcController newController() {
PayloadCarryingRpcController delegate = super.newController(); HBaseRpcController delegate = super.newController();
return getController(delegate); return getController(delegate);
} }


@Override @Override
public PayloadCarryingRpcController newController(CellScanner cellScanner) { public HBaseRpcController newController(CellScanner cellScanner) {
PayloadCarryingRpcController delegate = super.newController(cellScanner); HBaseRpcController delegate = super.newController(cellScanner);
return getController(delegate); return getController(delegate);
} }


@Override @Override
public PayloadCarryingRpcController newController(List<CellScannable> cellIterables) { public HBaseRpcController newController(List<CellScannable> cellIterables) {
PayloadCarryingRpcController delegate = super.newController(cellIterables); HBaseRpcController delegate = super.newController(cellIterables);
return getController(delegate); return getController(delegate);
} }


private PayloadCarryingRpcController getController(PayloadCarryingRpcController delegate) { private HBaseRpcController getController(HBaseRpcController delegate) {
// construct a chain of controllers: metadata and delegate controller // construct a chain of controllers: metadata and delegate controller
return new MetadataRpcController(delegate, conf); return new MetadataRpcController(delegate, conf);
} }
Expand Down
Expand Up @@ -21,8 +21,8 @@


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.DelegatingPayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SchemaUtil;
Expand All @@ -34,7 +34,7 @@
* {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix SYSTEM * {@link RpcController} that sets the appropriate priority of RPC calls destined for Phoenix SYSTEM
* tables * tables
*/ */
class MetadataRpcController extends DelegatingPayloadCarryingRpcController { class MetadataRpcController extends DelegatingHBaseRpcController {


private int priority; private int priority;
// list of system tables // list of system tables
Expand All @@ -53,7 +53,7 @@ class MetadataRpcController extends DelegatingPayloadCarryingRpcController {
.getNameAsString()) .getNameAsString())
.build(); .build();


public MetadataRpcController(PayloadCarryingRpcController delegate, public MetadataRpcController(HBaseRpcController delegate,
Configuration conf) { Configuration conf) {
super(delegate); super(delegate);
this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf); this.priority = PhoenixRpcSchedulerFactory.getMetadataPriority(conf);
Expand Down

0 comments on commit 136c7a6

Please sign in to comment.