diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java index 302089d8c50..cd04b4f5210 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java @@ -19,7 +19,6 @@ import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -37,15 +36,20 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PName; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PropertiesUtil; @@ -53,14 +57,17 @@ import org.apache.phoenix.util.TestUtil; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Verifies the number of RPC calls from {@link MetaDataClient} updateCache() * for transactional and non-transactional tables. */ public class UpdateCacheIT extends ParallelStatsDisabledIT { - - public static final int NUM_MILLIS_IN_DAY = 86400000; + + private static final Logger LOGGER = + LoggerFactory.getLogger(UpdateCacheIT.class); private static void setupSystemTable(String fullTableName) throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -104,10 +111,29 @@ public void testUpdateCacheForNeverUpdatedTable() throws Exception { String tableName = generateUniqueName(); String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName; Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + AtomicBoolean isSysMutexEmpty = new AtomicBoolean(true); + ExecutorService executorService = Executors.newFixedThreadPool(5, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("check-sys-mutex-count-%d").build()); + for (int i = 0; i < 5; i++) { + executorService.submit(new SystemMutexCaller(isSysMutexEmpty, + props, INDEX_DATA_SCHEMA, tableName)); + } + Thread.sleep(500); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA); - conn.createStatement().execute( - "alter table " + fullTableName + " SET UPDATE_CACHE_FREQUENCY=NEVER"); + conn.createStatement().execute("ALTER TABLE " + fullTableName + + " SET UPDATE_CACHE_FREQUENCY=NEVER"); + } + // make sure SYSTEM.MUTEX did not contain any record while + // ALTER TABLE SET query was being executed + assertTrue("Mutex should not have been acquired", isSysMutexEmpty.get()); + try { + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + // no action needed + LOGGER.debug("Error during ExecutorService shutdown"); } helpTestUpdateCache(fullTableName, new int[] {0, 0}, false); } @@ -136,14 +162,37 @@ public void testUpdateCacheForTimeLimitedUpdateTable() throws Exception { @Test public void testUpdateCacheForChangingUpdateTable() throws Exception { - String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + generateUniqueName(); + String tableName = generateUniqueName(); + String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + + tableName; Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + " UPDATE_CACHE_FREQUENCY=never"); + conn.createStatement().execute("CREATE TABLE " + fullTableName + + TestUtil.TEST_TABLE_SCHEMA + " UPDATE_CACHE_FREQUENCY=never"); } helpTestUpdateCache(fullTableName, new int[] {0, 0}, false); + AtomicBoolean isSysMutexEmpty = new AtomicBoolean(true); + ExecutorService executorService = Executors.newFixedThreadPool(5, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("check-sys-mutex-count-%d").build()); + for (int i = 0; i < 5; i++) { + executorService.submit(new SystemMutexCaller(isSysMutexEmpty, + props, INDEX_DATA_SCHEMA, tableName)); + } + Thread.sleep(500); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET UPDATE_CACHE_FREQUENCY=ALWAYS"); + conn.createStatement().execute("ALTER TABLE " + fullTableName + + " SET UPDATE_CACHE_FREQUENCY=ALWAYS"); + } + // make sure SYSTEM.MUTEX did not contain any record while + // ALTER TABLE SET query was being executed + assertTrue("Mutex should not have been acquired", isSysMutexEmpty.get()); + try { + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + // no action needed + LOGGER.debug("Error during ExecutorService shutdown"); } helpTestUpdateCache(fullTableName, new int[] {1, 3}, false); } @@ -269,4 +318,58 @@ public void testInvalidConnUpdateCacheFrequencyShouldThrow() throws Exception { } } } + + /** + * Helper Runnable impl class that continuously keeps checking + * if SYSTEM.MUTEX contains any record until either interrupted or + * provided connection is closed + */ + private static class SystemMutexCaller implements Runnable { + + private final AtomicBoolean isSysMutexEmpty; + private final Properties props; + private final String schemaName; + private final String tableName; + + public SystemMutexCaller(final AtomicBoolean isSysMutexEmpty, + final Properties props, final String schemaName, + final String tableName) { + this.isSysMutexEmpty = isSysMutexEmpty; + this.props = props; + this.schemaName = schemaName; + this.tableName = tableName; + } + + @Override + public void run() { + try (Connection conn = DriverManager.getConnection(getUrl(), + props)) { + while (!Thread.interrupted() && !conn.isClosed()) { + try { + ResultSet resultSet = conn.createStatement().executeQuery( + "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME + + " WHERE TENANT_ID IS NULL AND TABLE_SCHEM='" + schemaName + + "' AND TABLE_NAME='" + tableName + + "' AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL"); + if (resultSet.next()) { + isSysMutexEmpty.set(false); + break; + } + } catch (SQLException e) { + // most likely conn closure + if (conn.isClosed()) { + Thread.currentThread().interrupt(); + } else { + LOGGER.error("Error while scanning " + + PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME + " , thread: " + + Thread.currentThread().getName(), e); + } + } + } + } catch (SQLException e) { + LOGGER.error("Connection access error. Thread: {}", + Thread.currentThread().getName(), e); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index e9200494e16..4b809116b0c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -4014,12 +4014,19 @@ public MutationState addColumn(PTable table, List origColumnDefs, } } - if (EncodedColumnsUtil.usesEncodedColumnNames(table)) { - // for tables that use column encoding acquire a mutex on the base table as we - // need to update the encoded column qualifier counter on the base table - acquiredBaseTableMutex = writeCell(null, physicalSchemaName, physicalTableName, null); + if (EncodedColumnsUtil.usesEncodedColumnNames(table) + && stmtProperties.isEmpty()) { + // For tables that use column encoding acquire a mutex on + // the base table as we need to update the encoded column + // qualifier counter on the base table. Not applicable to + // ALTER TABLE/VIEW SET statements because + // we don't update the column qualifier counter while + // setting property, hence the check: stmtProperties.isEmpty() + acquiredBaseTableMutex = writeCell(null, physicalSchemaName, + physicalTableName, null); if (!acquiredBaseTableMutex) { - throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName); + throw new ConcurrentTableMutationException( + physicalSchemaName, physicalTableName); } } for (PColumn pColumn : columns) {