Skip to content

Commit

Permalink
PHOENIX-2940 Query the stats table and cache stats in the client
Browse files Browse the repository at this point in the history
Side-steps the issue of how to safely compute statistics and tie
them to a PTable being constructed inside the regionserver. Stats
can be asynchronously updated on the client side, or synchronously
fetched if necessary without blocking other Phoenix clients.

Includes some unit tests and logging on stats-cache maintenance
to help us track usage going forward.
  • Loading branch information
joshelser committed Jun 20, 2016
1 parent 917e025 commit e53e3bb
Show file tree
Hide file tree
Showing 19 changed files with 342 additions and 182 deletions.
Expand Up @@ -44,7 +44,6 @@
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
Expand Down Expand Up @@ -209,24 +208,4 @@ private void ensureTablesOnDifferentRegionServers(String tableName1, String tabl
// verify index and data tables are on different servers
assertNotEquals("Tables " + tableName1 + " and " + tableName2 + " should be on different region servers", serverName1, serverName2);
}

@Test
public void testMetadataQos() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = driver.connect(getUrl(), props);
try {
ensureTablesOnDifferentRegionServers(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME);
// create the table
conn.createStatement().execute(
"CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR)");
// query the table from another connection, so that SYSTEM.STATS will be used
conn.createStatement().execute("SELECT * FROM "+DATA_TABLE_FULL_NAME);
// verify that that metadata queue is at least once
Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getMetadataRpcExecutor(), Mockito.atLeastOnce()).dispatch(Mockito.any(CallRunner.class));
}
finally {
conn.close();
}
}

}
Expand Up @@ -198,8 +198,6 @@
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
Expand Down Expand Up @@ -951,30 +949,13 @@ private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableT
addColumnToTable(results, colName, famName, colKeyValues, columns, saltBucketNum != null);
}
}
PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getPhysicalTableName(
Bytes.toBytes(SchemaUtil.getTableName(schemaName.getBytes(), tableName.getBytes())), isNamespaceMapped)
.getNameAsString()) : physicalTables.get(0);
PTableStats stats = PTableStats.EMPTY_STATS;
if (tenantId == null) {
HTableInterface statsHTable = null;
try {
statsHTable = ServerUtil.getHTableForCoprocessorScan(env,
SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, env.getConfiguration())
.getName());
stats = StatisticsUtil.readStatistics(statsHTable, physicalTableName.getBytes(), clientTimeStamp);
timeStamp = Math.max(timeStamp, stats.getTimestamp());
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
logger.warn(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
env.getConfiguration()) + " not online yet?");
} finally {
if (statsHTable != null) statsHTable.close();
}
}
// Avoid querying the stats table because we're holding the rowLock here. Issuing an RPC to a remote
// server while holding this lock is a bad idea and likely to cause contention.
return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp, tableSeqNum,
pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName,
viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType,
rowKeyOrderOptimizable, transactional, updateCacheFrequency, stats, baseColumnCount,
rowKeyOrderOptimizable, transactional, updateCacheFrequency, baseColumnCount,
indexDisableTimestamp, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema);
}

Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.stats.GuidePostsInfo;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.QueryUtil;
Expand Down Expand Up @@ -123,7 +124,9 @@ private static boolean isSerial(StatementContext context, FilterableStatement st
if (perScanLimit == null || scan.getFilter() != null) {
return false;
}
GuidePostsInfo gpsInfo = table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
long scn = context.getConnection().getSCN() == null ? Long.MAX_VALUE : context.getConnection().getSCN();
PTableStats tableStats = context.getConnection().getQueryServices().getTableStats(table.getName().getBytes(), scn);
GuidePostsInfo gpsInfo = tableStats.getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
long estRowSize = SchemaUtil.estimateRowSize(table);
long estRegionSize;
if (gpsInfo == null) {
Expand Down
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
Expand Down Expand Up @@ -357,7 +358,11 @@ public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset,
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
physicalTableName = table.getPhysicalName().getBytes();
tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS;
Long currentSCN = context.getConnection().getSCN();
if (null == currentSCN) {
currentSCN = HConstants.LATEST_TIMESTAMP;
}
tableStats = useStats() ? context.getConnection().getQueryServices().getTableStats(physicalTableName, currentSCN) : PTableStats.EMPTY_STATS;
// Used to tie all the scans together during logging
scanId = UUID.randomUUID().toString();

Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.parse.PFunction;
Expand Down Expand Up @@ -132,4 +133,10 @@ public enum Feature {LOCAL_INDEX, RENEW_LEASE};

public MetaDataMutationResult dropSchema(List<Mutation> schemaMetaData, String schemaName) throws SQLException;

/**
* Removes cache {@link PTableStats} for the table with the given name. If no cached stats are present, this does nothing.
*
* @param tableName The table to remove stats for
*/
void invalidateStats(ImmutableBytesPtr tableName);
}
Expand Up @@ -51,6 +51,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
Expand Down Expand Up @@ -180,7 +181,6 @@
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.schema.stats.PTableStats;
import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
Expand Down Expand Up @@ -216,8 +216,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -230,16 +228,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
private static final int INITIAL_CHILD_SERVICES_CAPACITY = 100;
private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000;
// Max number of cached table stats for view or shared index physical tables
private static final int MAX_TABLE_STATS_CACHE_ENTRIES = 512;
protected final Configuration config;
private final ConnectionInfo connectionInfo;
// Copy of config.getProps(), but read-only to prevent synchronization that we
// don't need.
private final ReadOnlyProps props;
private final String userName;
private final ConcurrentHashMap<ImmutableBytesWritable,ConnectionQueryServices> childServices;
private final Cache<ImmutableBytesPtr, PTableStats> tableStatsCache;
private final TableStatsCache tableStatsCache;

// Cache the latest meta data here for future connections
// writes guarded by "latestMetaDataLock"
Expand Down Expand Up @@ -340,13 +336,6 @@ public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connec
// find the HBase version and use that to determine the KeyValueBuilder that should be used
String hbaseVersion = VersionInfo.getVersion();
this.kvBuilder = KeyValueBuilder.get(hbaseVersion);
long halfStatsUpdateFreq = config.getLong(
QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB,
QueryServicesOptions.DEFAULT_STATS_UPDATE_FREQ_MS) / 2;
tableStatsCache = CacheBuilder.newBuilder()
.maximumSize(MAX_TABLE_STATS_CACHE_ENTRIES)
.expireAfterWrite(halfStatsUpdateFreq, TimeUnit.MILLISECONDS)
.build();
this.returnSequenceValues = props.getBoolean(QueryServices.RETURN_SEQUENCE_VALUES_ATTRIB, QueryServicesOptions.DEFAULT_RETURN_SEQUENCE_VALUES);
this.renewLeaseEnabled = config.getBoolean(RENEW_LEASE_ENABLED, DEFAULT_RENEW_LEASE_ENABLED);
this.renewLeasePoolSize = config.getInt(RENEW_LEASE_THREAD_POOL_SIZE, DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE);
Expand All @@ -358,6 +347,8 @@ public ConnectionQueryServicesImpl(QueryServices services, ConnectionInfo connec
list.add(queue);
}
connectionQueues = ImmutableList.copyOf(list);
// A little bit of a smell to leak `this` here, but should not be a problem
this.tableStatsCache = new TableStatsCache(this, config);
}

@Override
Expand Down Expand Up @@ -3537,35 +3528,7 @@ private void throwConnectionClosedException() {
@Override
public PTableStats getTableStats(final byte[] physicalName, final long clientTimeStamp) throws SQLException {
try {
return tableStatsCache.get(new ImmutableBytesPtr(physicalName), new Callable<PTableStats>() {
@Override
public PTableStats call() throws Exception {
/*
* The shared view index case is tricky, because we don't have
* table metadata for it, only an HBase table. We do have stats,
* though, so we'll query them directly here and cache them so
* we don't keep querying for them.
*/
HTableInterface statsHTable = ConnectionQueryServicesImpl.this
.getTable(SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES,
ConnectionQueryServicesImpl.this.getProps()).getName());
try {
return StatisticsUtil.readStatistics(statsHTable, physicalName, clientTimeStamp);
} catch (IOException e) {
logger.warn("Unable to read from stats table", e);
// Just cache empty stats. We'll try again after some time anyway.
return PTableStats.EMPTY_STATS;
} finally {
try {
statsHTable.close();
} catch (IOException e) {
// Log, but continue. We have our stats anyway now.
logger.warn("Unable to close stats table", e);
}
}
}

});
return tableStatsCache.get(new ImmutableBytesPtr(physicalName));
} catch (ExecutionException e) {
throw ServerUtil.parseServerException(e);
}
Expand Down Expand Up @@ -3926,4 +3889,19 @@ private void ensureNamespaceDropped(String schemaName, long mutationTime) throws
}
}

/**
* Manually adds {@link PTableStats} for a table to the client-side cache. Not a
* {@link ConnectionQueryServices} method. Exposed for testing purposes.
*
* @param tableName Table name
* @param stats Stats instance
*/
public void addTableStats(ImmutableBytesPtr tableName, PTableStats stats) {
this.tableStatsCache.put(Objects.requireNonNull(tableName), stats);
}

@Override
public void invalidateStats(ImmutableBytesPtr tableName) {
this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
}
}
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
Expand All @@ -50,6 +51,7 @@
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
Expand Down Expand Up @@ -112,6 +114,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple
private volatile boolean initialized;
private volatile SQLException initializationException;
private final Map<String, List<HRegionLocation>> tableSplits = Maps.newHashMap();
private final TableStatsCache tableStatsCache;

public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) {
super(services);
Expand All @@ -138,6 +141,7 @@ public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo co
config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config);
TransactionManager txnManager = new TransactionManager(config);
this.txSystemClient = new InMemoryTxSystemClient(txnManager);
this.tableStatsCache = new TableStatsCache(this, config);
}

private PMetaData newEmptyMetaData() {
Expand Down Expand Up @@ -516,7 +520,11 @@ public String getUserName() {

@Override
public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) {
return PTableStats.EMPTY_STATS;
PTableStats stats = tableStatsCache.getCache().getIfPresent(physicalName);
if (null == stats) {
return PTableStats.EMPTY_STATS;
}
return stats;
}

@Override
Expand Down Expand Up @@ -629,4 +637,20 @@ public PMetaData removeSchema(PSchema schema, long schemaTimeStamp) {
public MetaDataMutationResult dropSchema(List<Mutation> schemaMetaData, String schemaName) {
return new MetaDataMutationResult(MutationCode.SCHEMA_ALREADY_EXISTS, 0, null);
}

/**
* Manually adds {@link PTableStats} for a table to the client-side cache. Not a
* {@link ConnectionQueryServices} method. Exposed for testing purposes.
*
* @param tableName Table name
* @param stats Stats instance
*/
public void addTableStats(ImmutableBytesPtr tableName, PTableStats stats) {
this.tableStatsCache.put(Objects.requireNonNull(tableName), stats);
}

@Override
public void invalidateStats(ImmutableBytesPtr tableName) {
this.tableStatsCache.invalidate(Objects.requireNonNull(tableName));
}
}
Expand Up @@ -33,6 +33,7 @@
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.parse.PFunction;
Expand Down Expand Up @@ -341,4 +342,9 @@ public PMetaData removeSchema(PSchema schema, long schemaTimeStamp) {
public MetaDataMutationResult dropSchema(List<Mutation> schemaMetaData, String schemaName) throws SQLException {
return getDelegate().dropSchema(schemaMetaData, schemaName);
}

@Override
public void invalidateStats(ImmutableBytesPtr tableName) {
getDelegate().invalidateStats(tableName);
}
}
Expand Up @@ -161,6 +161,8 @@ public interface QueryServices extends SQLCloseable {
public static final String RUN_UPDATE_STATS_ASYNC = "phoenix.update.stats.command.async";
public static final String STATS_SERVER_POOL_SIZE = "phoenix.stats.pool.size";
public static final String COMMIT_STATS_ASYNC = "phoenix.stats.commit.async";
// Maximum size in bytes taken up by cached table stats in the client
public static final String STATS_MAX_CACHE_SIZE = "phoenix.stats.cache.maxSize";

public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority";
Expand Down
Expand Up @@ -189,6 +189,8 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_RUN_UPDATE_STATS_ASYNC = true;
public static final boolean DEFAULT_COMMIT_STATS_ASYNC = true;
public static final int DEFAULT_STATS_POOL_SIZE = 4;
// Maximum size (in bytes) that cached table stats should take upm
public static final long DEFAULT_STATS_MAX_CACHE_SIZE = 256 * 1024 * 1024;

public static final boolean DEFAULT_USE_REVERSE_SCAN = true;

Expand Down

0 comments on commit e53e3bb

Please sign in to comment.