Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-6883 : Phoenix Metadata Caching Redesign #1883

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc.controller;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.phoenix.util.IndexUtil;

/**
* Controller used to invalidate server side metadata cache RPCs.
*/
public class InvalidateMetadataCacheController extends DelegatingHBaseRpcController {
private int priority;

public InvalidateMetadataCacheController(HBaseRpcController delegate, Configuration conf) {
super(delegate);
this.priority = IndexUtil.getInvalidateMetadataCachePriority(conf);
}

@Override
public void setPriority(int priority) {
this.priority = priority;
}

@Override
public void setPriority(TableName tn) {
// Nothing
}

@Override
public int getPriority() {
return this.priority;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc.controller;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;

/**
* Factory to instantiate InvalidateMetadataCacheControllers
*/
public class InvalidateMetadataCacheControllerFactory extends RpcControllerFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't we consider using the ServerSideRpcControllerFactory instead of creating a new factory ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this one

public InvalidateMetadataCacheControllerFactory(Configuration conf) {
super(conf);
}

@Override
public HBaseRpcController newController() {
HBaseRpcController delegate = super.newController();
return getController(delegate);
}

private HBaseRpcController getController(HBaseRpcController delegate) {
return new InvalidateMetadataCacheController(delegate, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link RpcControllerFactory} that should only be used when
* making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables.
*/
public class ServerSideRPCControllerFactory {

private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class);
protected final Configuration conf;

public ServerSideRPCControllerFactory(Configuration conf) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.cache;

import java.sql.SQLException;

/**
* Interface for server side metadata cache hosted on each region server.
*/
public interface ServerMetadataCache {
long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
throws SQLException;
void invalidate(byte[] tenantID, byte[] schemaName, byte[] tableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.cache;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
/**
* This manages the cache for all the objects(data table, views, indexes) on each region server.
* Currently, it only stores LAST_DDL_TIMESTAMP in the cache.
*/
public class ServerMetadataCacheImpl implements ServerMetadataCache {

protected Configuration conf;
// key is the combination of <tenantID, schema name, table name>, value is the lastDDLTimestamp
protected final Cache<ImmutableBytesPtr, Long> lastDDLTimestampMap;
private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCacheImpl.class);
private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS =
"phoenix.coprocessor.regionserver.cache.ttl.ms";
// Keeping default cache expiry for 30 mins since we won't have stale entry
// for more than 30 mins.
private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS
palashc marked this conversation as resolved.
Show resolved Hide resolved
= 30 * 60 * 1000L; // 30 mins
private static final String PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE
= "phoenix.coprocessor.regionserver.cache.size";
private static final long DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE = 10000L;
private static volatile ServerMetadataCacheImpl cacheInstance;
private MetricsMetadataCachingSource metricsSource;

/**
* Creates/gets an instance of ServerMetadataCache.
*
* @param conf configuration
* @return cache
*/
public static ServerMetadataCacheImpl getInstance(Configuration conf) {
ServerMetadataCacheImpl result = cacheInstance;
if (result == null) {
synchronized (ServerMetadataCacheImpl.class) {
result = cacheInstance;
if (result == null) {
cacheInstance = result = new ServerMetadataCacheImpl(conf);
}
}
}
return result;
}

public ServerMetadataCacheImpl(Configuration conf) {
this.conf = HBaseConfiguration.create(conf);
this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
.getInstance().getMetadataCachingSource();
long maxTTL = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS,
DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_TTL_MS);
long maxSize = conf.getLong(PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE,
DEFAULT_PHOENIX_COPROC_REGIONSERVER_CACHE_SIZE);
lastDDLTimestampMap = CacheBuilder.newBuilder()
.removalListener((RemovalListener<ImmutableBytesPtr, Long>) notification -> {
String key = notification.getKey().toString();
LOGGER.debug("Expiring " + key + " because of "
+ notification.getCause().name());
})
// maximum number of entries this cache can handle.
.maximumSize(maxSize)
.expireAfterAccess(maxTTL, TimeUnit.MILLISECONDS)
palashc marked this conversation as resolved.
Show resolved Hide resolved
.build();
}

/**
* Returns the last DDL timestamp from the table.
* If not found in cache, then query SYSCAT regionserver.
* @param tenantID tenant id
* @param schemaName schema name
* @param tableName table name
* @return last DDL timestamp
* @throws Exception
*/
public long getLastDDLTimestampForTable(byte[] tenantID, byte[] schemaName, byte[] tableName)
throws SQLException {
String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
palashc marked this conversation as resolved.
Show resolved Hide resolved
byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
// Lookup in cache if present.
Long lastDDLTimestamp = lastDDLTimestampMap.getIfPresent(tableKeyPtr);
if (lastDDLTimestamp != null) {
metricsSource.incrementRegionServerMetadataCacheHitCount();
LOGGER.trace("Retrieving last ddl timestamp value from cache for tableName: {}",
fullTableNameStr);
return lastDDLTimestamp;
}
metricsSource.incrementRegionServerMetadataCacheMissCount();
PTable table;
String tenantIDStr = Bytes.toString(tenantID);
if (tenantIDStr == null || tenantIDStr.isEmpty()) {
tenantIDStr = null;
}
Properties properties = new Properties();
if (tenantIDStr != null) {
properties.setProperty(TENANT_ID_ATTRIB, tenantIDStr);
}
try (Connection connection = getConnection(properties)) {
// Using PhoenixRuntime#getTableFromServerNoCache to completely bypass CQSI cache.
table = PhoenixRuntime.getTableFromServerNoCache(connection, schemaName, tableName);
// TODO PhoenixRuntime#getTableNoCache can throw TableNotFoundException.
// In that case, do we want to throw non retryable exception back to the client?
// Update cache with the latest DDL timestamp from SYSCAT server.
lastDDLTimestampMap.put(tableKeyPtr, table.getLastDDLTimestamp());
}
return table.getLastDDLTimestamp();
}

/**
* Invalidate cache for the given tenantID, schema name and table name.
* Guava cache is thread safe so we don't have to synchronize it explicitly.
* @param tenantID tenantID
* @param schemaName schemaName
* @param tableName tableName
*/
public void invalidate(byte[] tenantID, byte[] schemaName, byte[] tableName) {
String fullTableNameStr = SchemaUtil.getTableName(schemaName, tableName);
LOGGER.debug("Invalidating server metadata cache for tenantID: {}, full table: {}",
palashc marked this conversation as resolved.
Show resolved Hide resolved
Bytes.toString(tenantID), fullTableNameStr);
byte[] tableKey = SchemaUtil.getTableKey(tenantID, schemaName, tableName);
ImmutableBytesPtr tableKeyPtr = new ImmutableBytesPtr(tableKey);
lastDDLTimestampMap.invalidate(tableKeyPtr);
}

protected Connection getConnection(Properties properties) throws SQLException {
return QueryUtil.getConnectionOnServer(properties, this.conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ private void verifyIndexWhere(ParseNode indexWhere, StatementContext context,
}
public MutationPlan compile(final CreateIndexStatement create) throws SQLException {
final PhoenixConnection connection = statement.getConnection();
final ColumnResolver resolver = FromCompiler.getResolver(create, connection, create.getUdfParseNodes());
final ColumnResolver resolver
= FromCompiler.getResolverForCreateIndex(
create, connection, create.getUdfParseNodes());
Scan scan = new Scan();
final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
verifyIndexWhere(create.getWhere(), context, create.getTable().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public static ColumnResolver getResolverForCreation(final CreateTableStatement s
NamedTableNode tableNode = NamedTableNode.create(null, baseTable, Collections.<ColumnDef>emptyList());
// Always use non-tenant-specific connection here
try {
SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, tableNode, true);
SingleTableColumnResolver visitor
palashc marked this conversation as resolved.
Show resolved Hide resolved
= new SingleTableColumnResolver(connection, tableNode, true, true);
return visitor;
} catch (TableNotFoundException e) {
// Used for mapped VIEW, since we won't be able to resolve that.
Expand Down Expand Up @@ -280,7 +281,8 @@ public static ColumnResolver getResolver(NamedTableNode tableNode, PhoenixConnec

public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection)
throws SQLException {
SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true);
SingleTableColumnResolver visitor
= new SingleTableColumnResolver(connection, statement.getTable(), true, true);
palashc marked this conversation as resolved.
Show resolved Hide resolved
return visitor;
}

Expand All @@ -293,9 +295,14 @@ public static ColumnResolver getIndexResolver(SingleTableStatement statement,
}
}

public static ColumnResolver getResolver(SingleTableStatement statement, PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes)
public static ColumnResolver getResolverForCreateIndex(SingleTableStatement statement,
PhoenixConnection connection, Map<String, UDFParseNode> udfParseNodes)
throws SQLException {
SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true, 0, udfParseNodes);
// use alwaysHitServer=true to ensure client's cache is up-to-date even when client is
// validating last_ddl_timestamps and UCF = never.
SingleTableColumnResolver visitor
= new SingleTableColumnResolver(connection, statement.getTable(), true, 0,
udfParseNodes, true, null);
return visitor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public PhoenixStatement getStatement() {
}

public long getCurrentTime() throws SQLException {
long ts = this.getCurrentTable().getCurrentTime();
long ts = this.getCurrentTable().getTimeStamp();
// if the table is transactional then it is only resolved once per query, so we can't use the table timestamp
if (this.getCurrentTable().getTable().getType() != PTableType.SUBQUERY
&& this.getCurrentTable().getTable().getType() != PTableType.PROJECTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException {
// as max TS, so that the query can safely restarted and still work of a snapshot
// (so it won't see its own data in case of concurrent splits)
// see PHOENIX-4849
long serverTime = selectResolver.getTables().get(0).getCurrentTime();
long serverTime = selectResolver.getTables().get(0).getTimeStamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between getCurrentTime() and getTimeStamp() ?

Copy link
Contributor Author

@palashc palashc Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tkhurana Currently both of those methods return the same value i.e. upperBoundTimestamp which is either -1 or whatever is provided to the TableRef constructor. Current code:

// if UPDATE_CACHE_FREQUENCY is set, always let the server set timestamps
this.upperBoundTimeStamp = table.getUpdateCacheFrequency()!=0 ? QueryConstants.UNSET_TIMESTAMP : upperBoundTimeStamp;
this.currentTime = this.upperBoundTimeStamp;

public long getTimeStamp() {
        return this.upperBoundTimeStamp;
}
public long getCurrentTime() {
        return this.currentTime;
 }

Once we set UPDATE_CACHE_FREQ to NEVER, currentTime was being set to -1 and features using currentTime were breaking (like QueryOptimizer deciding whether a disabled index is under its usability threshold or asyncCreatedDate during CreateIndex). The following change in TableRef keeps currentTime to whatever is provided to the TableRef constructor so that features using currentTime can continue to use it. There is effectively no change in UpsertCompiler - it will still be using the same value i.e. TableRef.upperBoundTimeStamp.

this.currentTime = upperBoundTimeStamp;

if (serverTime == QueryConstants.UNSET_TIMESTAMP) {
// if this is the first time this table is resolved the ref's current time might not be defined, yet
// in that case force an RPC to get the server time
Expand Down