diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index da009d29f0c..659c8c9f42c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -44,7 +44,9 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.Statement; import java.util.Arrays; import java.util.Collection; import java.util.Map; @@ -471,6 +473,66 @@ public void testLockUntilMVCCAdvanced() throws Exception { assertEquals(1, actualRowCount); } + + @Test + public void testConcurrentUpsertAndSchemaUpdation() throws Exception { + final String tableName = "testConcurrentUpsertAndSchemaUpdation" + generateUniqueName(); + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE " + tableName + + "(k VARCHAR PRIMARY KEY, v1 INTEGER, v2 INTEGER ) COLUMN_ENCODED_BYTES = 0"); + Runnable r1 = () -> { + Connection conn1 = null; + try { + Properties props = new Properties(); + String url = QueryUtil.getConnectionUrl(props, config, "client1"); + conn1 = DriverManager.getConnection(url); + for (int i = 0; i < 10; i++) { + String pk = "foo" + i; + int column1 = i; + int column2 = i + 20; + String dml = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; + PreparedStatement stmt = conn1.prepareStatement(dml); + stmt.setString(1, pk); + stmt.setInt(2, column1); + stmt.setInt(3, column2); + stmt.execute(); + // Sleep for 5 seconds. + Thread.sleep(5000); + } + conn1.commit(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + conn1.close(); + } catch (SQLException sqle) { + sqle.printStackTrace(); + } + } + }; + Runnable r2 = () -> { + + try { + // Sleep for 10 seconds so that we generate atleast 2 mutations with old schema + Thread.sleep(10000); + Properties props = new Properties(); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + Connection conn2 = DriverManager.getConnection(url2); + conn2.createStatement() + .execute("ALTER TABLE " + tableName + " DROP COLUMN V2"); + conn2.commit(); + } catch (Exception e) { + e.printStackTrace(); + } + }; + Thread t1 = new Thread(r1); + t1.start(); + Thread t2 = new Thread(r2); + t2.start(); + t1.join(); + t2.join(); + } + public static class DelayingRegionObserver extends SimpleRegionObserver { private volatile boolean lockedTableRow; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index ebb3524f765..549c8848e8d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -1867,6 +1867,7 @@ public void commit() throws SQLException { boolean retryCommit = false; SQLException sqlE = null; try { + // validate last ddl timestamps send(); txMutations = this.txMutations; sendSuccessful = true;