Skip to content

Commit

Permalink
PHOENIX-2968 Minimize RPCs for ALTER statement over APPEND_ONLY_SCHEMA
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas D'Silva committed Jun 29, 2016
1 parent bbfb1e3 commit d1cde87
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 46 deletions.
Expand Up @@ -27,17 +27,19 @@
import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.anySetOf;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;


import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;


Expand All @@ -47,6 +49,7 @@
import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.ColumnAlreadyExistsException;
import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable;
Expand All @@ -59,7 +62,7 @@


public class AppendOnlySchemaIT extends BaseHBaseManagedTimeIT { public class AppendOnlySchemaIT extends BaseHBaseManagedTimeIT {


private void createTableWithSameSchema(boolean notExists, boolean sameClient) throws Exception { private void testTableWithSameSchema(boolean notExists, boolean sameClient) throws Exception {
// use a spyed ConnectionQueryServices so we can verify calls to getTable // use a spyed ConnectionQueryServices so we can verify calls to getTable
ConnectionQueryServices connectionQueryServices = ConnectionQueryServices connectionQueryServices =
Mockito.spy(driver.getConnectionQueryServices(getUrl(), Mockito.spy(driver.getConnectionQueryServices(getUrl(),
Expand All @@ -77,7 +80,7 @@ private void createTableWithSameSchema(boolean notExists, boolean sameClient) th
// create view // create view
String ddl = String ddl =
"CREATE VIEW " + (notExists ? "IF NOT EXISTS" : "") "CREATE VIEW " + (notExists ? "IF NOT EXISTS" : "")
+ " view1( hostName varchar NOT NULL," + " view1( hostName varchar NOT NULL, tagName varChar"
+ " CONSTRAINT HOSTNAME_PK PRIMARY KEY (hostName))" + " CONSTRAINT HOSTNAME_PK PRIMARY KEY (hostName))"
+ " AS SELECT * FROM metric_table" + " AS SELECT * FROM metric_table"
+ " APPEND_ONLY_SCHEMA = true, UPDATE_CACHE_FREQUENCY=300000"; + " APPEND_ONLY_SCHEMA = true, UPDATE_CACHE_FREQUENCY=300000";
Expand All @@ -86,7 +89,7 @@ private void createTableWithSameSchema(boolean notExists, boolean sameClient) th
conn1.commit(); conn1.commit();
reset(connectionQueryServices); reset(connectionQueryServices);


// execute same ddl // execute same create ddl
try { try {
conn2.createStatement().execute(ddl); conn2.createStatement().execute(ddl);
if (!notExists) { if (!notExists) {
Expand All @@ -100,12 +103,31 @@ private void createTableWithSameSchema(boolean notExists, boolean sameClient) th
} }


// verify getTable rpcs // verify getTable rpcs
verify(connectionQueryServices, sameClient ? never() : atMost(1)).getTable((PName)isNull(), eq(new byte[0]), eq(Bytes.toBytes("VIEW1")), anyLong(), anyLong()); verify(connectionQueryServices, sameClient ? never() : times(1)).getTable((PName)isNull(), eq(new byte[0]), eq(Bytes.toBytes("VIEW1")), anyLong(), anyLong());


// verify create table rpcs // verify no create table rpcs
verify(connectionQueryServices, never()).createTable(anyListOf(Mutation.class), verify(connectionQueryServices, never()).createTable(anyListOf(Mutation.class),
any(byte[].class), any(PTableType.class), anyMap(), anyList(), any(byte[][].class), any(byte[].class), any(PTableType.class), anyMap(), anyList(), any(byte[][].class),
eq(false)); eq(false));
reset(connectionQueryServices);

// execute alter table ddl that adds the same column
ddl = "ALTER VIEW view1 ADD " + (notExists ? "IF NOT EXISTS" : "") + " tagName varchar";
try {
conn2.createStatement().execute(ddl);
if (!notExists) {
fail("Alter Table should fail");
}
}
catch (ColumnAlreadyExistsException e) {
if (notExists) {
fail("Alter Table should not fail");
}
}

// if not verify exists is true one call to add column table with empty mutation list (which does not make a rpc)
// else verify no add column calls
verify(connectionQueryServices, notExists ? times(1) : never() ).addColumn(eq(Collections.<Mutation>emptyList()), any(PTable.class), anyMap(), anySetOf(String.class));


// upsert one row // upsert one row
conn2.createStatement().execute("UPSERT INTO view1(hostName, metricVal) VALUES('host2', 2.0)"); conn2.createStatement().execute("UPSERT INTO view1(hostName, metricVal) VALUES('host2', 2.0)");
Expand Down Expand Up @@ -135,25 +157,25 @@ private void createTableWithSameSchema(boolean notExists, boolean sameClient) th


@Test @Test
public void testSameSchemaWithNotExistsSameClient() throws Exception { public void testSameSchemaWithNotExistsSameClient() throws Exception {
createTableWithSameSchema(true, true); testTableWithSameSchema(true, true);
} }


@Test @Test
public void testSameSchemaWithNotExistsDifferentClient() throws Exception { public void testSameSchemaWithNotExistsDifferentClient() throws Exception {
createTableWithSameSchema(true, false); testTableWithSameSchema(true, false);
} }


@Test @Test
public void testSameSchemaSameClient() throws Exception { public void testSameSchemaSameClient() throws Exception {
createTableWithSameSchema(false, true); testTableWithSameSchema(false, true);
} }


@Test @Test
public void testSameSchemaDifferentClient() throws Exception { public void testSameSchemaDifferentClient() throws Exception {
createTableWithSameSchema(false, false); testTableWithSameSchema(false, false);
} }


private void createTableAddColumns(boolean sameClient) throws Exception { private void testAddColumns(boolean sameClient) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn1 = DriverManager.getConnection(getUrl(), props); try (Connection conn1 = DriverManager.getConnection(getUrl(), props);
Connection conn2 = sameClient ? conn1 : DriverManager.getConnection(getUrl(), props)) { Connection conn2 = sameClient ? conn1 : DriverManager.getConnection(getUrl(), props)) {
Expand Down Expand Up @@ -243,13 +265,13 @@ private void createTableAddColumns(boolean sameClient) throws Exception {
} }


@Test @Test
public void testCreateTableAddColumnsSameClient() throws Exception { public void testAddColumnsSameClient() throws Exception {
createTableAddColumns(true); testAddColumns(true);
} }


@Test @Test
public void testCreateTableAddColumnsDifferentClient() throws Exception { public void testTableAddColumnsDifferentClient() throws Exception {
createTableAddColumns(false); testAddColumns(false);
} }


public void testCreateTableDropColumns() throws Exception { public void testCreateTableDropColumns() throws Exception {
Expand Down
Expand Up @@ -907,40 +907,14 @@ public MutationState createTable(CreateTableStatement statement, byte[][] splits
List<ColumnDef> columnDefs = statement.getColumnDefs(); List<ColumnDef> columnDefs = statement.getColumnDefs();
PrimaryKeyConstraint pkConstraint = statement.getPrimaryKeyConstraint(); PrimaryKeyConstraint pkConstraint = statement.getPrimaryKeyConstraint();
// get the list of columns to add // get the list of columns to add
List<ColumnDef> newColumnDefs = Lists.newArrayList();
for (ColumnDef columnDef : columnDefs) { for (ColumnDef columnDef : columnDefs) {
if (pkConstraint.contains(columnDef.getColumnDefName())) { if (pkConstraint.contains(columnDef.getColumnDefName())) {
columnDef.setIsPK(true); columnDef.setIsPK(true);
} }
String familyName = columnDef.getColumnDefName().getFamilyName();
String columnName = columnDef.getColumnDefName().getColumnName();
if (familyName!=null) {
try {
PColumnFamily columnFamily = table.getColumnFamily(familyName);
columnFamily.getColumn(columnName);
}
catch (ColumnFamilyNotFoundException | ColumnNotFoundException e){
newColumnDefs.add(columnDef);
}
}
else {
try {
table.getColumn(columnName);
}
catch (ColumnNotFoundException e){
newColumnDefs.add(columnDef);
}
}
} }
// if there are new columns to add // if there are new columns to add
if (!newColumnDefs.isEmpty()) { return addColumn(table, columnDefs, statement.getProps(), statement.ifNotExists(),
return addColumn(table, newColumnDefs, statement.getProps(), true, NamedTableNode.create(statement.getTableName()), statement.getTableType());
statement.ifNotExists(), true,
NamedTableNode.create(statement.getTableName()), statement.getTableType());
}
else {
return new MutationState(0,connection);
}
} }
} }
table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, null, null, null, tableProps, commonFamilyProps); table = createTableInternal(statement, splits, parent, viewStatement, viewType, viewColumnConstants, isViewColumnReferenced, null, null, null, tableProps, commonFamilyProps);
Expand Down Expand Up @@ -2804,7 +2778,7 @@ public MutationState addColumn(AddColumnStatement statement) throws SQLException
return addColumn(table, statement.getColumnDefs(), statement.getProps(), statement.ifNotExists(), false, statement.getTable(), statement.getTableType()); return addColumn(table, statement.getColumnDefs(), statement.getProps(), statement.ifNotExists(), false, statement.getTable(), statement.getTableType());
} }


public MutationState addColumn(PTable table, List<ColumnDef> columnDefs, public MutationState addColumn(PTable table, List<ColumnDef> origColumnDefs,
ListMultimap<String, Pair<String, Object>> stmtProperties, boolean ifNotExists, ListMultimap<String, Pair<String, Object>> stmtProperties, boolean ifNotExists,
boolean removeTableProps, NamedTableNode namedTableNode, PTableType tableType) boolean removeTableProps, NamedTableNode namedTableNode, PTableType tableType)
throws SQLException { throws SQLException {
Expand All @@ -2824,8 +2798,40 @@ public MutationState addColumn(PTable table, List<ColumnDef> columnDefs,
Long updateCacheFrequencyProp = null; Long updateCacheFrequencyProp = null;


Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size()); Map<String, List<Pair<String, Object>>> properties = new HashMap<>(stmtProperties.size());
if (columnDefs == null) { List<ColumnDef> columnDefs = null;
columnDefs = Collections.emptyList(); if (table.isAppendOnlySchema()) {
// only make the rpc if we are adding new columns
columnDefs = Lists.newArrayList();
for (ColumnDef columnDef : origColumnDefs) {
String familyName = columnDef.getColumnDefName().getFamilyName();
String columnName = columnDef.getColumnDefName().getColumnName();
if (familyName!=null) {
try {
PColumnFamily columnFamily = table.getColumnFamily(familyName);
columnFamily.getColumn(columnName);
if (!ifNotExists) {
throw new ColumnAlreadyExistsException(schemaName, tableName, columnName);
}
}
catch (ColumnFamilyNotFoundException | ColumnNotFoundException e){
columnDefs.add(columnDef);
}
}
else {
try {
table.getColumn(columnName);
if (!ifNotExists) {
throw new ColumnAlreadyExistsException(schemaName, tableName, columnName);
}
}
catch (ColumnNotFoundException e){
columnDefs.add(columnDef);
}
}
}
}
else {
columnDefs = origColumnDefs == null ? Collections.<ColumnDef>emptyList() : origColumnDefs;
} }
for (String family : stmtProperties.keySet()) { for (String family : stmtProperties.keySet()) {
List<Pair<String, Object>> origPropsList = stmtProperties.get(family); List<Pair<String, Object>> origPropsList = stmtProperties.get(family);
Expand Down

0 comments on commit d1cde87

Please sign in to comment.