Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-6082 : Avoid checkAndPut when altering properties for a table or view with column-encoding enabled #983

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 112 additions & 9 deletions phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
Expand All @@ -37,30 +36,38 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Verifies the number of RPC calls from {@link MetaDataClient} updateCache()
* for transactional and non-transactional tables.
*/
public class UpdateCacheIT extends ParallelStatsDisabledIT {

public static final int NUM_MILLIS_IN_DAY = 86400000;

private static final Logger LOGGER =
LoggerFactory.getLogger(UpdateCacheIT.class);

private static void setupSystemTable(String fullTableName) throws SQLException {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Expand Down Expand Up @@ -104,10 +111,29 @@ public void testUpdateCacheForNeverUpdatedTable() throws Exception {
String tableName = generateUniqueName();
String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
AtomicBoolean isSysMutexEmpty = new AtomicBoolean(true);
ExecutorService executorService = Executors.newFixedThreadPool(5,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("check-sys-mutex-count-%d").build());
for (int i = 0; i < 5; i++) {
executorService.submit(new SystemMutexCaller(isSysMutexEmpty,
props, INDEX_DATA_SCHEMA, tableName));
}
Thread.sleep(500);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("create table " + fullTableName + TestUtil.TEST_TABLE_SCHEMA);
conn.createStatement().execute(
"alter table " + fullTableName + " SET UPDATE_CACHE_FREQUENCY=NEVER");
conn.createStatement().execute("ALTER TABLE " + fullTableName
+ " SET UPDATE_CACHE_FREQUENCY=NEVER");
}
// make sure SYSTEM.MUTEX did not contain any record while
// ALTER TABLE SET <props> query was being executed
assertTrue("Mutex should not have been acquired", isSysMutexEmpty.get());
try {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (Exception e) {
// no action needed
LOGGER.debug("Error during ExecutorService shutdown");
}
helpTestUpdateCache(fullTableName, new int[] {0, 0}, false);
}
Expand Down Expand Up @@ -136,14 +162,37 @@ public void testUpdateCacheForTimeLimitedUpdateTable() throws Exception {

@Test
public void testUpdateCacheForChangingUpdateTable() throws Exception {
String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + generateUniqueName();
String tableName = generateUniqueName();
String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR
+ tableName;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + " UPDATE_CACHE_FREQUENCY=never");
conn.createStatement().execute("CREATE TABLE " + fullTableName
+ TestUtil.TEST_TABLE_SCHEMA + " UPDATE_CACHE_FREQUENCY=never");
}
helpTestUpdateCache(fullTableName, new int[] {0, 0}, false);
AtomicBoolean isSysMutexEmpty = new AtomicBoolean(true);
ExecutorService executorService = Executors.newFixedThreadPool(5,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("check-sys-mutex-count-%d").build());
for (int i = 0; i < 5; i++) {
executorService.submit(new SystemMutexCaller(isSysMutexEmpty,
props, INDEX_DATA_SCHEMA, tableName));
}
Thread.sleep(500);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET UPDATE_CACHE_FREQUENCY=ALWAYS");
conn.createStatement().execute("ALTER TABLE " + fullTableName
+ " SET UPDATE_CACHE_FREQUENCY=ALWAYS");
}
// make sure SYSTEM.MUTEX did not contain any record while
// ALTER TABLE SET <props> query was being executed
assertTrue("Mutex should not have been acquired", isSysMutexEmpty.get());
try {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (Exception e) {
// no action needed
LOGGER.debug("Error during ExecutorService shutdown");
}
helpTestUpdateCache(fullTableName, new int[] {1, 3}, false);
}
Expand Down Expand Up @@ -269,4 +318,58 @@ public void testInvalidConnUpdateCacheFrequencyShouldThrow() throws Exception {
}
}
}

/**
* Helper Runnable impl class that continuously keeps checking
* if SYSTEM.MUTEX contains any record until either interrupted or
* provided connection is closed
*/
private static class SystemMutexCaller implements Runnable {

private final AtomicBoolean isSysMutexEmpty;
private final Properties props;
private final String schemaName;
private final String tableName;

public SystemMutexCaller(final AtomicBoolean isSysMutexEmpty,
final Properties props, final String schemaName,
final String tableName) {
this.isSysMutexEmpty = isSysMutexEmpty;
this.props = props;
this.schemaName = schemaName;
this.tableName = tableName;
}

@Override
public void run() {
try (Connection conn = DriverManager.getConnection(getUrl(),
props)) {
while (!Thread.interrupted() && !conn.isClosed()) {
try {
ResultSet resultSet = conn.createStatement().executeQuery(
"SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME
+ " WHERE TENANT_ID IS NULL AND TABLE_SCHEM='" + schemaName
+ "' AND TABLE_NAME='" + tableName
+ "' AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL");
if (resultSet.next()) {
isSysMutexEmpty.set(false);
break;
}
} catch (SQLException e) {
// most likely conn closure
if (conn.isClosed()) {
Thread.currentThread().interrupt();
} else {
LOGGER.error("Error while scanning "
+ PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME + " , thread: "
+ Thread.currentThread().getName(), e);
}
}
}
} catch (SQLException e) {
LOGGER.error("Connection access error. Thread: {}",
Thread.currentThread().getName(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4014,12 +4014,19 @@ public MutationState addColumn(PTable table, List<ColumnDef> origColumnDefs,
}
}

if (EncodedColumnsUtil.usesEncodedColumnNames(table)) {
// for tables that use column encoding acquire a mutex on the base table as we
// need to update the encoded column qualifier counter on the base table
acquiredBaseTableMutex = writeCell(null, physicalSchemaName, physicalTableName, null);
if (EncodedColumnsUtil.usesEncodedColumnNames(table)
&& stmtProperties.isEmpty()) {
// For tables that use column encoding acquire a mutex on
// the base table as we need to update the encoded column
// qualifier counter on the base table. Not applicable to
// ALTER TABLE/VIEW SET <property> statements because
// we don't update the column qualifier counter while
// setting property, hence the check: stmtProperties.isEmpty()
acquiredBaseTableMutex = writeCell(null, physicalSchemaName,
physicalTableName, null);
if (!acquiredBaseTableMutex) {
throw new ConcurrentTableMutationException(physicalSchemaName, physicalTableName);
throw new ConcurrentTableMutationException(
physicalSchemaName, physicalTableName);
}
}
for (PColumn pColumn : columns) {
Expand Down