Skip to content

Commit

Permalink
PHOENIX-4799 Write cells using checkAndMutate to prevent conflicting …
Browse files Browse the repository at this point in the history
…changes
  • Loading branch information
twdsilva committed Jul 25, 2018
1 parent 26d2460 commit 65cc9b1
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,29 @@
*/
package org.apache.phoenix.end2end;

import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -36,34 +57,13 @@
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;

@RunWith(Parameterized.class)
public class BasePermissionsIT extends BaseTest {
Expand All @@ -82,6 +82,10 @@ public class BasePermissionsIT extends BaseTest {
static final String SYSTEM_SEQUENCE_IDENTIFIER =
QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\"" + PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_TABLE+ "\"";

static final String SYSTEM_MUTEX_IDENTIFIER =
QueryConstants.SYSTEM_SCHEMA_NAME + "." + "\""
+ PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME + "\"";

static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new HashSet<>(Arrays.asList(
"SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:FUNCTION"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ private void grantSystemTableAccess(User superUser, User... users) throws Except
} else {
verifyAllowed(grantPermissions("RX", user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
}
verifyAllowed(grantPermissions("W", user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
verifyAllowed(grantPermissions("RWX", user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
verifyAllowed(grantPermissions("RWX", user, SYSTEM_MUTEX_IDENTIFIER, false), superUser);
}
}

Expand All @@ -69,6 +70,7 @@ private void revokeSystemTableAccess(User superUser, User... users) throws Excep
verifyAllowed(revokePermissions(user, PHOENIX_SYSTEM_TABLES_IDENTIFIERS, false), superUser);
}
verifyAllowed(revokePermissions(user, SYSTEM_SEQUENCE_IDENTIFIER, false), superUser);
verifyAllowed(revokePermissions(user, SYSTEM_MUTEX_IDENTIFIER, false), superUser);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -284,7 +283,7 @@ private void changeMutexLock(Properties clientProps, boolean acquire) throws SQL
assertTrue(((ConnectionQueryServicesImpl) services)
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_MIGRATION_TIMESTAMP, mutexRowKey));
} else {
((ConnectionQueryServicesImpl) services).releaseUpgradeMutex(mutexRowKey);
services.deleteMutexCell(mutexRowKey);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.sql.Connection;
import java.util.Collections;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.security.AccessDeniedException;
Expand Down Expand Up @@ -54,6 +53,10 @@ private void grantSystemTableAccess() throws Exception {
Action.READ, Action.EXEC);
grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
Action.READ, Action.EXEC);
grantPermissions(regularUser1.getShortName(), Collections.singleton("SYSTEM:MUTEX"), Action.WRITE,
Action.READ, Action.EXEC);
grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:MUTEX"), Action.WRITE,
Action.READ, Action.EXEC);

} else {
grantPermissions(regularUser1.getShortName(), PHOENIX_SYSTEM_TABLES, Action.READ, Action.EXEC);
Expand All @@ -64,6 +67,10 @@ private void grantSystemTableAccess() throws Exception {
Action.READ, Action.EXEC);
grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM:SEQUENCE"), Action.WRITE,
Action.READ, Action.EXEC);
grantPermissions(regularUser1.getShortName(), Collections.singleton("SYSTEM.MUTEX"), Action.WRITE,
Action.READ, Action.EXEC);
grantPermissions(unprivilegedUser.getShortName(), Collections.singleton("SYSTEM.MUTEX"), Action.WRITE,
Action.READ, Action.EXEC);
}
} catch (Throwable e) {
if (e instanceof Exception) {
Expand Down
22 changes: 2 additions & 20 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/UpgradeIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.phoenix.end2end;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX;
import static org.apache.phoenix.query.ConnectionQueryServicesImpl.UPGRADE_MUTEX_UNLOCKED;
import static org.apache.phoenix.query.ConnectionQueryServicesImpl.MUTEX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -425,28 +424,13 @@ public boolean isUpgradeRequired() {
}
}

private void putUnlockKVInSysMutex(byte[] row) throws Exception {
try (Connection conn = getConnection(false, null)) {
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
try (HTableInterface sysMutexTable = services.getTable(PhoenixDatabaseMetaData.SYSTEM_MUTEX_NAME_BYTES)) {
byte[] family = PhoenixDatabaseMetaData.SYSTEM_MUTEX_FAMILY_NAME_BYTES;
byte[] qualifier = UPGRADE_MUTEX;
Put put = new Put(row);
put.addColumn(family, qualifier, UPGRADE_MUTEX_UNLOCKED);
sysMutexTable.put(put);
sysMutexTable.flushCommits();
}
}
}

@Test
public void testAcquiringAndReleasingUpgradeMutex() throws Exception {
ConnectionQueryServices services = null;
byte[] mutexRowKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
generateUniqueName());
try (Connection conn = getConnection(false, null)) {
services = conn.unwrap(PhoenixConnection.class).getQueryServices();
putUnlockKVInSysMutex(mutexRowKey);
assertTrue(((ConnectionQueryServicesImpl)services)
.acquireUpgradeMutex(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0, mutexRowKey));
try {
Expand All @@ -456,8 +440,7 @@ public void testAcquiringAndReleasingUpgradeMutex() throws Exception {
} catch (UpgradeInProgressException expected) {

}
assertTrue(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
assertFalse(((ConnectionQueryServicesImpl)services).releaseUpgradeMutex(mutexRowKey));
services.deleteMutexCell(mutexRowKey);
}
}

Expand All @@ -471,7 +454,6 @@ public void testConcurrentUpgradeThrowsUprgadeInProgressException() throws Excep
final byte[] mutexKey = Bytes.toBytes(generateUniqueName());
try (Connection conn = getConnection(false, null)) {
services = conn.unwrap(PhoenixConnection.class).getQueryServices();
putUnlockKVInSysMutex(mutexKey);
FutureTask<Void> task1 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus1, services, latch, numExceptions, mutexKey));
FutureTask<Void> task2 = new FutureTask<>(new AcquireMutexRunnable(mutexStatus2, services, latch, numExceptions, mutexKey));
Thread t1 = new Thread(task1);
Expand Down
Loading

0 comments on commit 65cc9b1

Please sign in to comment.