diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java index d33d538723a..4358a12e1d3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java @@ -16,8 +16,29 @@ */ package org.apache.phoenix.end2end; -import com.google.common.base.Joiner; -import com.google.common.base.Throwables; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,34 +57,13 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.QueryUtil; import org.junit.After; import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.security.PrivilegedExceptionAction; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; @RunWith(Parameterized.class) public class BasePermissionsIT extends BaseTest { @@ -82,6 +82,10 @@ public class BasePermissionsIT extends BaseTest { static final String SYSTEM_SEQUENCE_IDENTIFIER = QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\"" + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE+ "\""; + static final String SYSTEM_MUTEX_IDENTIFIER = + QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\"" + + PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME + "\""; + static final Set PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(Arrays.asList( "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION")); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java index 3965f699b72..7f2964dee87 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ChangePermissionsIT.java @@ -57,7 +57,8 @@ private void grantSystemTableAccess(User superUser, User... users) throws Except } else { verifyAllowed(grantPermissions("RX", user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser); } - verifyAllowed(grantPermissions("W", user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser); + verifyAllowed(grantPermissions("RWX", user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser); + verifyAllowed(grantPermissions("RWX", user, SYSTEM_MUTEX_IDENTIFIER, false), superUser); } } @@ -69,6 +70,7 @@ private void revokeSystemTableAccess(User superUser, User... users) throws Excep verifyAllowed(revokePermissions(user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser); } verifyAllowed(revokePermissions(user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser); + verifyAllowed(revokePermissions(user, SYSTEM_MUTEX_IDENTIFIER, false), superUser); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java index d253f6ebb9a..f3edc58df12 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java @@ -54,7 +54,6 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -284,7 +283,7 @@ private void changeMutexLock(Properties clientProps, boolean acquire) throws SQL assertTrue(((ConnectionQueryServicesImpl) services) .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey)); } else { - ((ConnectionQueryServicesImpl) services).releaseUpgradeMutex(mutexRowKey); + services.deleteMutexCell(mutexRowKey); } } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java index 8666bb8426d..86a6b60163a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableDDLPermissionsIT.java @@ -20,7 +20,6 @@ import java.sql.Connection; import java.util.Collections; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.security.AccessDeniedException; @@ -54,6 +53,10 @@ private void grantSystemTableAccess() throws Exception { Action.READ, Action.EXEC); grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE, Action.READ, Action.EXEC); + grantPermissions(regularUser1.getShortName(), Collections.singleton("SYSTEM:MUTEX"), Action.WRITE, + Action.READ, Action.EXEC); + grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:MUTEX"), Action.WRITE, + Action.READ, Action.EXEC); } else { grantPermissions(regularUser1.getShortName(), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC); @@ -64,6 +67,10 @@ private void grantSystemTableAccess() throws Exception { Action.READ, Action.EXEC); grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE, Action.READ, Action.EXEC); + grantPermissions(regularUser1.getShortName(), Collections.singleton("SYSTEM.MUTEX"), Action.WRITE, + Action.READ, Action.EXEC); + grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM.MUTEX"), Action.WRITE, + Action.READ, Action.EXEC); } } catch (Throwable e) { if (e instanceof Exception) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java index 2b866a5e998..f70bcc71f31 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java @@ -18,8 +18,7 @@ package org.apache.phoenix.end2end; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX; -import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_UNLOCKED; +import static org.apache.phoenix.query.ConnectionQueryServicesImpl.MUTEX; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -425,20 +424,6 @@ public boolean isUpgradeRequired() { } } - private void putUnlockKVInSysMutex(byte[] row) throws Exception { - try (Connection conn = getConnection(false, null)) { - ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - try (HTableInterface sysMutexTable = services.getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) { - byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; - byte[] qualifier = UPGRADE_MUTEX; - Put put = new Put(row); - put.addColumn(family, qualifier, UPGRADE_MUTEX_UNLOCKED); - sysMutexTable.put(put); - sysMutexTable.flushCommits(); - } - } - } - @Test public void testAcquiringAndReleasingUpgradeMutex() throws Exception { ConnectionQueryServices services = null; @@ -446,7 +431,6 @@ public void testAcquiringAndReleasingUpgradeMutex() throws Exception { generateUniqueName()); try (Connection conn = getConnection(false, null)) { services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - putUnlockKVInSysMutex(mutexRowKey); assertTrue(((ConnectionQueryServicesImpl)services) .acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey)); try { @@ -456,8 +440,7 @@ public void testAcquiringAndReleasingUpgradeMutex() throws Exception { } catch (UpgradeInProgressException expected) { } - assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey)); - assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey)); + services.deleteMutexCell(mutexRowKey); } } @@ -471,7 +454,6 @@ public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Excep final byte[] mutexKey = Bytes.toBytes(generateUniqueName()); try (Connection conn = getConnection(false, null)) { services = conn.unwrap(PhoenixConnection.class).getQueryServices(); - putUnlockKVInSysMutex(mutexKey); FutureTask task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey)); FutureTask task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey)); Thread t1 = new Thread(task1); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java index fdfd75bbc8c..c4bf91b40dc 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java @@ -20,11 +20,13 @@ import static com.google.common.collect.Lists.newArrayListWithExpectedSize; import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MODIFY_VIEW_PK; import static org.apache.phoenix.exception.SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.analyzeTable; import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -43,6 +45,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -52,9 +61,9 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -67,6 +76,7 @@ import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.ColumnAlreadyExistsException; +import org.apache.phoenix.schema.ConcurrentTableMutationException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; @@ -74,6 +84,7 @@ import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; @@ -93,6 +104,17 @@ public class ViewIT extends SplitSystemCatalogIT { protected String tableDDLOptions; protected boolean transactional; + + private static final String FAILED_VIEWNAME = "FAILED_VIEW"; + private static final byte[] FAILED_ROWKEY_BYTES = + SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2), Bytes.toBytes(FAILED_VIEWNAME)); + private static final String SLOW_VIEWNAME_PREFIX = "SLOW_VIEW"; + private static final byte[] SLOW_ROWKEY_PREFIX_BYTES = + SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2), + Bytes.toBytes(SLOW_VIEWNAME_PREFIX)); + + private static volatile CountDownLatch latch1 = null; + private static volatile CountDownLatch latch2 = null; public ViewIT(boolean transactional) { StringBuilder optionBuilder = new StringBuilder(); @@ -114,7 +136,7 @@ public static void doSetup() throws Exception { Map props = Collections.emptyMap(); boolean splitSystemCatalog = (driver == null); Map serverProps = Maps.newHashMapWithExpectedSize(1); - serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); + serverProps.put("hbase.coprocessor.region.classes", TestMetaDataRegionObserver.class.getName()); serverProps.put("hbase.coprocessor.abortonerror", "false"); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(props.entrySet().iterator())); // Split SYSTEM.CATALOG once after the mini-cluster is started @@ -123,6 +145,49 @@ public static void doSetup() throws Exception { } } + public static class TestMetaDataRegionObserver extends BaseRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, + MiniBatchOperationInProgress miniBatchOp) throws IOException { + if (shouldFail(c, miniBatchOp.getOperation(0))) { + // throwing anything other than instances of IOException result + // in this coprocessor being unloaded + // DoNotRetryIOException tells HBase not to retry this mutation + // multiple times + throw new DoNotRetryIOException(); + } else if (shouldSlowDown(c, miniBatchOp.getOperation(0))) { + // simulate a slow write to SYSTEM.CATALOG + if (latch1 != null) { + latch1.countDown(); + } + if (latch2 != null) { + try { + // wait till the second task is complete before completing the first task + boolean result = latch2.await(2, TimeUnit.MINUTES); + if (!result) { + throw new RuntimeException("Second task took took long to complete"); + } + } catch (InterruptedException e) { + } + } + } + } + + private boolean shouldFail(ObserverContext c, Mutation m) { + TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable(); + return tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + && (Bytes.equals(FAILED_ROWKEY_BYTES, m.getRow())); + } + + private boolean shouldSlowDown(ObserverContext c, + Mutation m) { + TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable(); + byte[] rowKeyPrefix = Arrays.copyOf(m.getRow(), SLOW_ROWKEY_PREFIX_BYTES.length); + return tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + && (Bytes.equals(SLOW_ROWKEY_PREFIX_BYTES, rowKeyPrefix)); + } + } + @Test public void testReadOnlyOnUpdatableView() throws Exception { String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); @@ -1274,28 +1339,156 @@ public void testChildViewCreationFails() throws Exception { PhoenixRuntime.getTableNoCache(conn, fullViewName2); } - private static final String FAILED_VIEWNAME = "FAILED_VIEW"; - private static final byte[] ROWKEY_TO_FAIL_BYTES = SchemaUtil.getTableKey(null, Bytes.toBytes(SCHEMA2), - Bytes.toBytes(FAILED_VIEWNAME)); - - public static class FailingRegionObserver extends SimpleRegionObserver { + @Test + public void testConcurrentViewCreationAndTableDrop() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); + String fullViewName1 = + SchemaUtil.getTableName(SCHEMA2, + SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName()); + String fullViewName2 = SchemaUtil.getTableName(SCHEMA3, generateUniqueName()); + latch1 = new CountDownLatch(1); + latch2 = new CountDownLatch(1); + String tableDdl = + "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)" + + tableDDLOptions; + conn.createStatement().execute(tableDdl); + + ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setPriority(Thread.MIN_PRIORITY); + return t; + } + }); + + // create the view in a separate thread (which will take some time + // to complete) + Future future = + executorService.submit(new CreateViewRunnable(fullTableName, fullViewName1)); + // wait till the thread makes the rpc to create the view + latch1.await(); + tableDdl = "DROP TABLE " + fullTableName; + try { + // drop table should fail as we are concurrently adding a view + conn.createStatement().execute(tableDdl); + fail("Creating a view while concurrently dropping the base table should fail"); + } catch (ConcurrentTableMutationException e) { + } + latch2.countDown(); + + Exception e = future.get(); + assertNull(e == null); + + // create another view to ensure that the cell used to prevent + // concurrent modifications was removed + String ddl = + "CREATE VIEW " + fullViewName2 + " (v2 VARCHAR) AS SELECT * FROM " + + fullTableName + " WHERE k = 6"; + conn.createStatement().execute(ddl); + } + } + + @Test + public void testConcurrentAddColumn() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String fullTableName = SchemaUtil.getTableName(SCHEMA1, generateUniqueName()); + String fullViewName = + SchemaUtil.getTableName(SCHEMA2, + SLOW_VIEWNAME_PREFIX + "_" + generateUniqueName()); + // create base table + String tableDdl = + "CREATE TABLE " + fullTableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)" + + tableDDLOptions; + conn.createStatement().execute(tableDdl); + // create a view + String ddl = + "CREATE VIEW " + fullViewName + " (v2 VARCHAR) AS SELECT * FROM " + + fullTableName + " WHERE k = 6"; + conn.createStatement().execute(ddl); + + latch1 = new CountDownLatch(1); + latch2 = new CountDownLatch(1); + ExecutorService executorService = Executors.newFixedThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setPriority(Thread.MIN_PRIORITY); + return t; + } + }); + + // add a column to the view in a separate thread (which will take + // some time to complete) + Future future = executorService.submit(new AddColumnRunnable(fullViewName)); + // wait till the thread makes the rpc to create the view + boolean result = latch1.await(2, TimeUnit.MINUTES); + if (!result) { + fail("The create view rpc look too long"); + } + tableDdl = "ALTER TABLE " + fullTableName + " ADD v3 INTEGER"; + try { + // add the same column to the base table with a different type + conn.createStatement().execute(tableDdl); + fail("Creating a view while concurrently dropping the base table should fail"); + } catch (ConcurrentTableMutationException e) { + } + latch2.countDown(); + + Exception e = future.get(); + assertNull(e); + + // add a new column to the base table to ensure that the cell used + // to prevent concurrent modifications was removed + tableDdl = "ALTER TABLE " + fullTableName + " ADD v4 INTEGER"; + conn.createStatement().execute(tableDdl); + } + } + + private class CreateViewRunnable implements Callable { + private final String fullTableName; + private final String fullViewName; + + public CreateViewRunnable(String fullTableName, String fullViewName) { + this.fullTableName = fullTableName; + this.fullViewName = fullViewName; + } + @Override - public void preBatchMutate(ObserverContext c, - MiniBatchOperationInProgress miniBatchOp) throws IOException { - if (shouldFail(c, miniBatchOp.getOperation(0))) { - // throwing anything other than instances of IOException result - // in this coprocessor being unloaded - // DoNotRetryIOException tells HBase not to retry this mutation - // multiple times - throw new DoNotRetryIOException(); + public Exception call() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = + "CREATE VIEW " + fullViewName + " (v2 VARCHAR) AS SELECT * FROM " + + fullTableName + " WHERE k = 5"; + conn.createStatement().execute(ddl); + } catch (SQLException e) { + return e; } + return null; } + } - private boolean shouldFail(ObserverContext c, Mutation m) { - TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable(); - return tableName.equals(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) - && (Bytes.equals(ROWKEY_TO_FAIL_BYTES, m.getRow())); + private class AddColumnRunnable implements Callable { + private final String fullViewName; + + public AddColumnRunnable(String fullViewName) { + this.fullViewName = fullViewName; } + @Override + public Exception call() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = "ALTER VIEW " + fullViewName + " ADD v3 CHAR(13)"; + conn.createStatement().execute(ddl); + } catch (SQLException e) { + return e; + } + return null; + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 6f8cbc00f13..8b8d0c1875f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -169,4 +169,18 @@ public enum Feature {LOCAL_INDEX, RENEW_LEASE}; public QueryLoggerDisruptor getQueryDisruptor(); public PhoenixTransactionClient initTransactionClient(TransactionFactory.Provider provider); + + /** + * Writes a cell to SYSTEM.MUTEX using checkAndPut to ensure only a single client can execute a + * particular task + * @param rowToLock the row key of the cell being written + * @return true if this client was able to successfully acquire the mutex + */ + public boolean writeMutexCell(byte[] rowKey) throws SQLException; + + /** + * Deletes a cell that was written to SYSTEM.MUTEX + * @param rowKey + */ + public void deleteMutexCell(byte[] rowKey) throws SQLException; } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 4c7630d830a..03b5ba0a30c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTableInterface; @@ -326,9 +327,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private final AtomicBoolean upgradeRequired = new AtomicBoolean(false); private final int maxConnectionsAllowed; private final boolean shouldThrottleNumConnections; - public static final byte[] UPGRADE_MUTEX = "UPGRADE_MUTEX".getBytes(); - public static final byte[] UPGRADE_MUTEX_LOCKED = "UPGRADE_MUTEX_LOCKED".getBytes(); - public static final byte[] UPGRADE_MUTEX_UNLOCKED = "UPGRADE_MUTEX_UNLOCKED".getBytes(); + public static final byte[] MUTEX = "MUTEX".getBytes(); + public static final byte[] MUTEX_LOCKED = "MUTEX_LOCKED".getBytes(); private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); @@ -2502,6 +2502,10 @@ private String setSystemLogDDLProperties(String ddl) { protected String getChildLinkDDL() { return setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA); } + + protected String getMutexDDL() { + return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA); + } private String setSystemDDLProperties(String ddl) { return String.format(ddl, @@ -2673,13 +2677,6 @@ void createSysMutexTableIfNotExists(HBaseAdmin admin) throws IOException, SQLExc columnDesc.setTimeToLive(TTL_FOR_MUTEX); // Let mutex expire after some time tableDesc.addFamily(columnDesc); admin.createTable(tableDesc); - try (HTableInterface sysMutexTable = getTable(mutexTableName.getName())) { - byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, - PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE); - Put put = new Put(mutexRowKey); - put.addColumn(PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES, UPGRADE_MUTEX, UPGRADE_MUTEX_UNLOCKED); - sysMutexTable.put(put); - } } catch (IOException e) { if(!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class)) || !Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), org.apache.hadoop.hbase.TableNotFoundException.class))) { @@ -2687,13 +2684,6 @@ void createSysMutexTableIfNotExists(HBaseAdmin admin) throws IOException, SQLExc } else { throw e; } - }catch(PhoenixIOException e){ - if(e.getCause()!=null && e.getCause() instanceof AccessDeniedException) - { - //Ignore - }else{ - throw e; - } } } @@ -2723,6 +2713,9 @@ private void createOtherSystemTables(PhoenixConnection metaConnection, HBaseAdmi try { metaConnection.createStatement().executeUpdate(getChildLinkDDL()); } catch (TableAlreadyExistsException e) {} + try { + metaConnection.createStatement().executeUpdate(getMutexDDL()); + } catch (TableAlreadyExistsException e) {} // Catch the IOException to log the error message and then bubble it up for the client to retry. try { createSysMutexTableIfNotExists(hbaseAdmin); @@ -3173,6 +3166,9 @@ public void upgradeSystemTables(final String url, final Properties props) throws try { metaConnection.createStatement().executeUpdate(getChildLinkDDL()); } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} + try { + metaConnection.createStatement().executeUpdate(getMutexDDL()); + } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} // In case namespace mapping is enabled and system table to system namespace mapping is also enabled, // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work @@ -3467,61 +3463,59 @@ void ensureSystemTablesMigratedToSystemNamespace() public boolean acquireUpgradeMutex(long currentServerSideTableTimestamp, byte[] rowToLock) throws IOException, SQLException { Preconditions.checkArgument(currentServerSideTableTimestamp < MIN_SYSTEM_TABLE_TIMESTAMP); - byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes(); if(sysMutexPhysicalTableNameBytes == null) { throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp), getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); } + if (!writeMutexCell(rowToLock)) { + throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp), + getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); + } + return true; + } - try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) { - byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; - byte[] qualifier = UPGRADE_MUTEX; - byte[] oldValue = UPGRADE_MUTEX_UNLOCKED; - byte[] newValue = UPGRADE_MUTEX_LOCKED; - Put put = new Put(rowToLock); - put.addColumn(family, qualifier, newValue); - boolean acquired = sysMutexTable.checkAndPut(rowToLock, family, qualifier, oldValue, put); - if (!acquired) { - /* - * Because of TTL on the SYSTEM_MUTEX_FAMILY, it is very much possible that the cell - * has gone away. So we need to retry with an old value of null. Note there is a small - * race condition here that between the two checkAndPut calls, it is possible that another - * request would have set the value back to UPGRADE_MUTEX_UNLOCKED. In that scenario this - * following checkAndPut would still return false even though the lock was available. - */ - acquired = sysMutexTable.checkAndPut(rowToLock, family, qualifier, null, put); - if (!acquired) { - throw new UpgradeInProgressException(getVersion(currentServerSideTableTimestamp), - getVersion(MIN_SYSTEM_TABLE_TIMESTAMP)); - } + @Override + public boolean writeMutexCell(byte[] rowToLock) throws SQLException { + try { + // at this point the system mutex table should have been created or + // an exception thrown + byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes(); + try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) { + byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; + byte[] qualifier = MUTEX; + byte[] value = MUTEX_LOCKED; + Put put = new Put(rowToLock); + put.addColumn(family, qualifier, value); + boolean checkAndPut = + sysMutexTable.checkAndPut(rowToLock, family, qualifier, null, put); + return checkAndPut; } - return true; + } catch (IOException e) { + throw ServerUtil.parseServerException(e); } } - @VisibleForTesting - public boolean releaseUpgradeMutex(byte[] mutexRowKey) throws IOException, SQLException { - boolean released = false; - - byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes(); - if(sysMutexPhysicalTableNameBytes == null) { - // We shouldn't never be really in this situation where neither SYSMUTEX or SYS:MUTEX exists - return true; - } + private void releaseUpgradeMutex(byte[] mutexRowKey) throws IOException, SQLException { + deleteMutexCell(mutexRowKey); + } - try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) { - byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; - byte[] qualifier = UPGRADE_MUTEX; - byte[] expectedValue = UPGRADE_MUTEX_LOCKED; - byte[] newValue = UPGRADE_MUTEX_UNLOCKED; - Put put = new Put(mutexRowKey); - put.addColumn(family, qualifier, newValue); - released = sysMutexTable.checkAndPut(mutexRowKey, family, qualifier, expectedValue, put); - } catch (Exception e) { - logger.warn("Release of upgrade mutex failed", e); + @Override + public void deleteMutexCell(byte[] mutexRowKey) throws SQLException { + try { + // at this point the system mutex table should have been created or + // an exception thrown + byte[] sysMutexPhysicalTableNameBytes = getSysMutexPhysicalTableNameBytes(); + try (HTableInterface sysMutexTable = getTable(sysMutexPhysicalTableNameBytes)) { + byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES; + byte[] qualifier = MUTEX; + Delete delete = new Delete(mutexRowKey); + delete.addColumn(family, qualifier); + sysMutexTable.delete(delete); + } + } catch (IOException e) { + throw ServerUtil.parseServerException(e); } - return released; } private byte[] getSysMutexPhysicalTableNameBytes() throws IOException, SQLException { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 5a462145eb9..f33d51aa491 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -180,6 +180,10 @@ private String setSystemLogDDLProperties(String ddl) { protected String getChildLinkDDL() { return setSystemDDLProperties(QueryConstants.CREATE_CHILD_LINK_METADATA); } + + protected String getMutexDDL() { + return setSystemDDLProperties(QueryConstants.CREATE_MUTEX_METADTA); + } private String setSystemDDLProperties(String ddl) { return String.format(ddl, @@ -379,6 +383,11 @@ public void init(String url, Properties props) throws SQLException { .executeUpdate(getChildLinkDDL()); } catch (NewerTableAlreadyExistsException ignore) { } + try { + metaConnection.createStatement() + .executeUpdate(getMutexDDL()); + } catch (NewerTableAlreadyExistsException ignore) { + } } catch (SQLException e) { sqlE = e; } finally { @@ -730,4 +739,13 @@ public QueryLoggerDisruptor getQueryDisruptor() { public PhoenixTransactionClient initTransactionClient(Provider provider) { return null; // Client is not necessary } + + @Override + public boolean writeMutexCell(byte[] rowKey) throws SQLException { + return true; + } + + @Override + public void deleteMutexCell(byte[] rowKey) throws SQLException { + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index b3e2cb2644d..8e4030be89f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -370,4 +370,13 @@ public QueryLoggerDisruptor getQueryDisruptor() { public PhoenixTransactionClient initTransactionClient(Provider provider) { return getDelegate().initTransactionClient(provider); } + + @Override + public boolean writeMutexCell(byte[] rowKey) throws SQLException { + return true; + } + + @Override + public void deleteMutexCell(byte[] rowKey) throws SQLException { + } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 8d8d47f9e80..32fedc8ea60 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -355,5 +355,19 @@ public enum JoinType {INNER, LEFT_OUTER} + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + HConstants.VERSIONS + "=%s,\n" + HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; + + public static final String CREATE_MUTEX_METADTA = + "CREATE IMMUTABLE TABLE " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_MUTEX_TABLE_NAME + "\"(\n" + + // Pk columns + TENANT_ID + " VARCHAR NULL," + + TABLE_SCHEM + " VARCHAR NULL," + + TABLE_NAME + " VARCHAR NOT NULL," + + COLUMN_NAME + " VARCHAR NULL," + // null for table row + COLUMN_FAMILY + " VARCHAR NULL " + // using for CF to uniqueness for columns + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + + HConstants.VERSIONS + "=%s,\n" + + HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" + + PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 625d03f634a..755933672d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1874,6 +1874,35 @@ private static boolean checkAndValidateRowTimestampCol(ColumnDef colDef, Primary } return false; } + + /** + * If we are creating a view we write a cell to the SYSTEM.MUTEX table with the rowkey of the + * parent table to prevent concurrent modifications + */ + private void writeCell(String tenantId, String schemaName, String tableName, String columnName) + throws SQLException { + byte[] rowKey = + columnName != null + ? SchemaUtil.getColumnKey(tenantId, schemaName, tableName, columnName, null) + : SchemaUtil.getTableKey(tenantId, schemaName, tableName); + boolean success = connection.getQueryServices().writeMutexCell(rowKey); + if (!success) { + throw new ConcurrentTableMutationException(schemaName, tableName); + } + } + + /** + * Remove the cell that was written to to the SYSTEM.MUTEX table with the rowkey of the + * parent table to prevent concurrent modifications + */ + private void deleteCell(String tenantId, String schemaName, String tableName, String columnName) + throws SQLException { + byte[] rowKey = + columnName != null + ? SchemaUtil.getColumnKey(tenantId, schemaName, tableName, columnName, null) + : SchemaUtil.getTableKey(tenantId, schemaName, tableName); + connection.getQueryServices().deleteMutexCell(rowKey); + } private PTable createTableInternal(CreateTableStatement statement, byte[][] splits, final PTable parent, String viewStatement, ViewType viewType, @@ -1913,6 +1942,18 @@ private PTable createTableInternal(CreateTableStatement statement, byte[][] spli boolean isLocalIndex = indexType == IndexType.LOCAL; QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS; ImmutableStorageScheme immutableStorageScheme = ONE_CELL_PER_COLUMN; + + if (tableType == PTableType.VIEW) { + PName physicalName = parent.getPhysicalName(); + String physicalSchemaName = + SchemaUtil.getSchemaNameFromFullName(physicalName.getString()); + String physicalTableName = + SchemaUtil.getTableNameFromFullName(physicalName.getString()); + // acquire the mutex using the global physical table name to + // prevent creating views while concurrently dropping the base + // table + writeCell(null, physicalSchemaName, physicalTableName, null); + } if (parent != null && tableType == PTableType.INDEX) { timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider); storeNulls = parent.getStoreNulls(); @@ -2833,6 +2874,14 @@ public boolean isViewReferenced() { } } finally { connection.setAutoCommit(wasAutoCommit); + if (tableType == PTableType.VIEW) { + PName physicalName = parent.getPhysicalName(); + String physicalSchemaName = + SchemaUtil.getSchemaNameFromFullName(physicalName.getString()); + String physicalTableName = + SchemaUtil.getTableNameFromFullName(physicalName.getString()); + deleteCell(null, physicalSchemaName, physicalTableName, null); + } } } @@ -2942,9 +2991,9 @@ MutationState dropTable(String schemaName, String tableName, String parentTableN boolean ifExists, boolean cascade, boolean skipAddingParentColumns) throws SQLException { connection.rollback(); boolean wasAutoCommit = connection.getAutoCommit(); + PName tenantId = connection.getTenantId(); + String tenantIdStr = tenantId == null ? null : tenantId.getString(); try { - PName tenantId = connection.getTenantId(); - String tenantIdStr = tenantId == null ? null : tenantId.getString(); byte[] key = SchemaUtil.getTableKey(tenantIdStr, schemaName, tableName); Long scn = connection.getSCN(); long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; @@ -2957,6 +3006,11 @@ MutationState dropTable(String schemaName, String tableName, String parentTableN Delete linkDelete = new Delete(linkKey, clientTimeStamp); tableMetaData.add(linkDelete); } + if (tableType == PTableType.TABLE) { + // acquire a mutex on the table to prevent creating views while concurrently + // dropping the base table + writeCell(null, schemaName, tableName, null); + } MetaDataMutationResult result = connection.getQueryServices().dropTable(tableMetaData, tableType, cascade, skipAddingParentColumns); MutationCode code = result.getMutationCode(); PTable table = result.getTable(); @@ -3034,6 +3088,10 @@ MutationState dropTable(String schemaName, String tableName, String parentTableN return new MutationState(0, 0, connection); } finally { connection.setAutoCommit(wasAutoCommit); + // lock the table to prevent concurrent table modifications + if (tableType == PTableType.TABLE) { + deleteCell(null, schemaName, tableName, null); + } } } @@ -3252,11 +3310,12 @@ public MutationState addColumn(PTable table, List origColumnDefs, throws SQLException { connection.rollback(); boolean wasAutoCommit = connection.getAutoCommit(); + List columns = Lists.newArrayListWithExpectedSize(origColumnDefs != null ? origColumnDefs.size() : 0); + PName tenantId = connection.getTenantId(); + String schemaName = table.getSchemaName().getString(); + String tableName = table.getTableName().getString(); try { connection.setAutoCommit(false); - PName tenantId = connection.getTenantId(); - String schemaName = table.getSchemaName().getString(); - String tableName = table.getTableName().getString(); List columnDefs = null; if (table.isAppendOnlySchema()) { @@ -3337,7 +3396,6 @@ public MutationState addColumn(PTable table, List origColumnDefs, boolean willBeTxnl = metaProperties.getNonTxToTx(); Long timeStamp = TransactionUtil.getTableTimestamp(connection, table.isTransactional() || willBeTxnl, table.isTransactional() ? table.getTransactionProvider() : metaPropertiesEvaluated.getTransactionProvider()); int numPkColumnsAdded = 0; - List columns = Lists.newArrayListWithExpectedSize(numCols); Set colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>(); Set families = new LinkedHashSet<>(); PTable tableForCQCounters = tableType == PTableType.VIEW ? PhoenixRuntime.getTable(connection, table.getPhysicalName().getString()) : table; @@ -3534,6 +3592,18 @@ public MutationState addColumn(PTable table, List origColumnDefs, } } + for (PColumn pColumn : columns) { + PName physicalName = table.getPhysicalName(); + String physicalSchemaName = + SchemaUtil.getSchemaNameFromFullName(physicalName.getString()); + String physicalTableName = + SchemaUtil.getTableNameFromFullName(physicalName.getString()); + // acquire the mutex using the global physical table name to + // prevent creating the same column on a table or view with + // a conflicting type etc + writeCell(null, physicalSchemaName, physicalTableName, + pColumn.getName().getString()); + } MetaDataMutationResult result = connection.getQueryServices().addColumn(tableMetaData, table, properties, colFamiliesForPColumnsToBeAdded, columns); try { MutationCode code = processMutationResult(schemaName, tableName, result); @@ -3604,6 +3674,17 @@ public MutationState addColumn(PTable table, List origColumnDefs, } } finally { connection.setAutoCommit(wasAutoCommit); + if (!columns.isEmpty()) { + for (PColumn pColumn : columns) { + PName physicalName = table.getPhysicalName(); + String physicalSchemaName = + SchemaUtil.getSchemaNameFromFullName(physicalName.getString()); + String physicalTableName = + SchemaUtil.getTableNameFromFullName(physicalName.getString()); + deleteCell(null, physicalSchemaName, physicalTableName, + pColumn.getName().getString()); + } + } } }