Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-6218 Rows deleted count is incorrect for immutable tables with indexes #961

Merged
merged 3 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
Expand Down Expand Up @@ -185,7 +187,7 @@ public void testDropIfImmutableKeyValueColumn() throws Exception {

conn.setAutoCommit(true);
String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
conn.createStatement().execute(dml);
assertEquals(1, conn.createStatement().executeUpdate(dml));

rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName);
assertTrue(rs.next());
Expand Down Expand Up @@ -227,7 +229,7 @@ public void testDeleteFromPartialPK() throws Exception {
assertEquals(3, rs.getInt(1));

String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'";
conn.createStatement().execute(dml);
assertEquals(1, conn.createStatement().executeUpdate(dml));
assertIndexMutations(conn);
conn.commit();

Expand Down Expand Up @@ -269,7 +271,7 @@ public void testDeleteFromNonPK() throws Exception {
assertEquals(3, rs.getInt(1));

String dml = "DELETE from " + fullTableName + " WHERE varchar_col1='varchar_a' AND varchar_pk='varchar1'";
conn.createStatement().execute(dml);
assertEquals(1, conn.createStatement().executeUpdate(dml));
assertIndexMutations(conn);
conn.commit();

Expand Down Expand Up @@ -630,4 +632,181 @@ public Thread newThread(Runnable r) {
}
}

@Test
public void testDeleteCount_PK() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = "TBL_" + generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = "IND_" + generateUniqueName();

try (Connection conn = DriverManager.getConnection(getUrl())) {

conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ this.tableDDLOptions);

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));

PreparedStatement dataPreparedStatement =
conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
for (int i = 1; i <= 10; i++) {
dataPreparedStatement.setInt(1, i);
dataPreparedStatement.setInt(2, i + 1);
dataPreparedStatement.setInt(3, i * 2);
dataPreparedStatement.execute();
}
conn.commit();

PreparedStatement deleteStmt =
conn.prepareStatement("DELETE FROM " + dataTableFullName + " WHERE ID > 5");
assertEquals(5, deleteStmt.executeUpdate());
conn.commit();
}
}

@Test
public void testDeleteCount_nonPK() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = "TBL_" + generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = "IND_" + generateUniqueName();
String indexTableName2 = "IND_" + generateUniqueName();

try (Connection conn = DriverManager.getConnection(getUrl())) {

conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ this.tableDDLOptions);

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL2) INCLUDE (VAL1)", indexTableName2, dataTableFullName));

PreparedStatement dataPreparedStatement =
conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
for (int i = 1; i <= 10; i++) {
dataPreparedStatement.setInt(1, i);
dataPreparedStatement.setInt(2, i + 1);
dataPreparedStatement.setInt(3, i * 2);
dataPreparedStatement.execute();
}
conn.commit();

PreparedStatement deleteStmt =
conn.prepareStatement("DELETE FROM " + dataTableFullName + " WHERE VAL1 > 6");
assertEquals(5, deleteStmt.executeUpdate());
conn.commit();
}
}

@Test
public void testDeleteCount_limit() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = "TBL_" + generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = "IND_" + generateUniqueName();
String indexTableName2 = "IND_" + generateUniqueName();

try (Connection conn = DriverManager.getConnection(getUrl())) {

conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ this.tableDDLOptions);

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL2) INCLUDE (VAL1)", indexTableName2, dataTableFullName));

PreparedStatement dataPreparedStatement =
conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
for (int i = 1; i <= 10; i++) {
dataPreparedStatement.setInt(1, i);
dataPreparedStatement.setInt(2, i + 1);
dataPreparedStatement.setInt(3, i * 2);
dataPreparedStatement.execute();
}
conn.commit();

PreparedStatement deleteStmt =
conn.prepareStatement("DELETE FROM " + dataTableFullName + " WHERE VAL1 > 6 LIMIT 3");
assertEquals(3, deleteStmt.executeUpdate());
conn.commit();
}
}

@Test
public void testDeleteCount_noCoveredColumn() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = "TBL_" + generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = "IND_" + generateUniqueName();
String indexTableName2 = "IND_" + generateUniqueName();
tkhurana marked this conversation as resolved.
Show resolved Hide resolved

try (Connection conn = DriverManager.getConnection(getUrl())) {

conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ this.tableDDLOptions);

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL1)", indexTableName, dataTableFullName));

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL2)", indexTableName2, dataTableFullName));

PreparedStatement dataPreparedStatement =
conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
for (int i = 1; i <= 10; i++) {
dataPreparedStatement.setInt(1, i);
dataPreparedStatement.setInt(2, i + 1);
dataPreparedStatement.setInt(3, i * 2);
dataPreparedStatement.execute();
}
conn.commit();

PreparedStatement deleteStmt =
conn.prepareStatement("DELETE FROM " + dataTableFullName + " WHERE VAL1 > 6 LIMIT 3");
assertEquals(3, deleteStmt.executeUpdate());
conn.commit();
}
}

@Test
public void testDeleteCount_index() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = "TBL_" + generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = "IND_" + generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);

try (Connection conn = DriverManager.getConnection(getUrl())) {

conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ this.tableDDLOptions);

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL1)", indexTableName, dataTableFullName));

PreparedStatement dataPreparedStatement =
conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
for (int i = 1; i <= 10; i++) {
dataPreparedStatement.setInt(1, i);
dataPreparedStatement.setInt(2, i + 1);
dataPreparedStatement.setInt(3, i * 2);
dataPreparedStatement.execute();
}
conn.commit();

PreparedStatement deleteStmt =
conn.prepareStatement("DELETE FROM " + indexTableFullName + " WHERE \"0:VAL1\" > 6");
assertEquals(5, deleteStmt.executeUpdate());
conn.commit();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,39 @@ public void testUpdateNonIndexedColumn() throws Exception {
}
}

@Test
public void testDeleteCount_nonPK() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = "TBL_" + generateUniqueName();
tkhurana marked this conversation as resolved.
Show resolved Hide resolved
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = "IND_" + generateUniqueName();

try (Connection conn = DriverManager.getConnection(getUrl())) {

conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ this.tableDDLOptions);

conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));

PreparedStatement dataPreparedStatement =
conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
for (int i = 1; i <= 10; i++) {
dataPreparedStatement.setInt(1, i);
dataPreparedStatement.setInt(2, i + 1);
dataPreparedStatement.setInt(3, i * 2);
dataPreparedStatement.execute();
}
conn.commit();

PreparedStatement deleteStmt =
conn.prepareStatement("DELETE FROM " + dataTableFullName + " WHERE VAL1 > 6");
assertEquals(5, deleteStmt.executeUpdate());
conn.commit();
}
}

private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException {
PreparedStatement stmt = tenantConn.prepareStatement(dml);
stmt.setString(1, "00000000000000" + String.valueOf(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,22 @@ public byte[] getRowKey() {

// If auto flush is true, this last batch will be committed upon return
int nCommittedRows = autoFlush ? (rowCount / batchSize * batchSize) : 0;
MutationState state = new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection);

// tableRef can be index if the index table is selected by the query plan or if we do the DELETE
// directly on the index table. In other cases it refers to the data table
MutationState tableState =
new MutationState(tableRef, mutations, nCommittedRows, maxSize, maxSizeBytes, connection);
MutationState state;
if (otherTableRefs.isEmpty()) {
state = tableState;
} else {
state = new MutationState(maxSize, maxSizeBytes, connection);
// if there are other table references we need to start with an empty mutation state and
// then join the other states. We only need to count the data table rows that will be deleted.
// MutationState.join() correctly maintains that accounting and ignores the index table rows.
// This way we always return the correct number of rows that are deleted.
state.join(tableState);
}
for (int i = 0; i < otherTableRefs.size(); i++) {
MutationState indexState = new MutationState(otherTableRefs.get(i), otherMutations.get(i), 0, maxSize, maxSizeBytes, connection);
state.join(indexState);
Expand Down Expand Up @@ -911,22 +926,8 @@ public MutationState execute() throws SQLException {
totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
}
// Return total number of rows that have been deleted from the table. In the case of auto commit being off
// the mutations will all be in the mutation state of the current connection. We need to divide by the
// total number of tables we updated as otherwise the client will get an inflated result.
int totalTablesUpdateClientSide = 1; // data table is always updated
PTable bestTable = bestPlan.getTableRef().getTable();
// global immutable tables are also updated client side (but don't double count the data table)
if (bestPlan != dataPlan && isMaintainedOnClient(bestTable)) {
totalTablesUpdateClientSide++;
}
for (TableRef otherTableRef : otherTableRefs) {
PTable otherTable = otherTableRef.getTable();
// Don't double count the data table here (which morphs when it becomes a projected table, hence this check)
if (projectedTableRef != otherTableRef && isMaintainedOnClient(otherTable)) {
totalTablesUpdateClientSide++;
}
}
MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount/totalTablesUpdateClientSide);
// the mutations will all be in the mutation state of the current connection.
MutationState state = new MutationState(maxSize, maxSizeBytes, connection, totalRowCount);

// set the read metrics accumulated in the parent context so that it can be published when the mutations are committed.
state.setReadMetricQueue(context.getReadMetricsQueue());
Expand Down