From 74208e984bb473ca6a9838f117fe5d6b0fc3e86d Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Fri, 13 Jan 2017 11:10:19 +0700 Subject: [PATCH 1/2] IGNITE-4518 Fixed parallel load cache. --- .../store/jdbc/dialect/BasicJdbcDialect.java | 16 ++++--- .../store/jdbc/CacheJdbcPojoStoreTest.java | 48 ++++++++++++++++++- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java index 3ab112af41516..ab8e32bcc4845 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java @@ -192,13 +192,17 @@ private static String where(Collection keyCols, int keyCnt) { if (appendUpperBound) { sb.a("("); - for (int cnt = keyCols.size(); cnt > 0; cnt--) { - for (int j = 0; j < cnt; j++) - if (j == cnt - 1) - sb.a(cols[j]).a(" <= ? "); + for (int keyCnt = keyCols.size(); keyCnt > 0; keyCnt--) { + for (int idx = 0, lastIdx = keyCnt - 1; idx < keyCnt; idx++) { + sb.a(cols[idx]); + + if (idx == lastIdx) + sb.a(keyCnt == keyCols.size() ? " <= ? " : " < ? "); else - sb.a(cols[j]).a(" = ? AND "); - if (cnt != 1) + sb.a(" = ? AND "); + } + + if (keyCnt != 1) sb.a(" OR "); } diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index d8f75d3dcb6ee..4a0b1daf646d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -216,7 +216,7 @@ public CacheJdbcPojoStoreTest() throws Exception { stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + "Person_Complex (id integer not null, org_id integer not null, city_id integer not null, " + - "name varchar(50), salary integer, PRIMARY KEY(id))"); + "name varchar(50), salary integer, PRIMARY KEY(id, org_id, city_id))"); conn.commit(); @@ -349,6 +349,52 @@ else if (k instanceof PersonComplexKey && v instanceof Person) { assertTrue(prnComplexKeys.isEmpty()); } + /** + * @throws Exception If failed. + */ + public void testParallelLoad() throws Exception { + Connection conn = store.openConnection(false); + + PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)"); + + for (int i = 0; i < 8; i++) { + + prnComplexStmt.setInt(1, (i >> 2) & 1); + prnComplexStmt.setInt(2, (i >> 1) & 1); + prnComplexStmt.setInt(3, i % 2); + + prnComplexStmt.setString(4, "name"); + prnComplexStmt.setInt(5, 1000 + i * 500); + + prnComplexStmt.addBatch(); + } + + prnComplexStmt.executeBatch(); + + U.closeQuiet(prnComplexStmt); + + conn.commit(); + + U.closeQuiet(conn); + + final Collection prnComplexKeys = new ConcurrentLinkedQueue<>(); + + IgniteBiInClosure c = new CI2() { + @Override public void apply(Object k, Object v) { + if (k instanceof PersonComplexKey && v instanceof Person) + prnComplexKeys.add((PersonComplexKey)k); + else + fail("Unexpected entry [key=" + k + ", value=" + v + "]"); + } + }; + + store.setParallelLoadCacheMinimumThreshold(2); + + store.loadCache(c); + + assertEquals(8, prnComplexKeys.size()); + } + /** * @throws Exception If failed. */ From 7a83968ccdba43d9c2d9a40f6019dbdaf6816587 Mon Sep 17 00:00:00 2001 From: Andrey Novikov Date: Mon, 16 Jan 2017 10:26:33 +0700 Subject: [PATCH 2/2] IGNITE-4518 Minor fix after review. --- .../store/jdbc/dialect/BasicJdbcDialect.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java index ab8e32bcc4845..139f3fca7504d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java @@ -173,13 +173,15 @@ private static String where(Collection keyCols, int keyCnt) { if (appendLowerBound) { sb.a("("); - for (int cnt = keyCols.size(); cnt > 0; cnt--) { - for (int j = 0; j < cnt; j++) - if (j == cnt - 1) - sb.a(cols[j]).a(" > ? "); + for (int keyCnt = keyCols.size(); keyCnt > 0; keyCnt--) { + for (int idx = 0; idx < keyCnt; idx++) { + if (idx == keyCnt - 1) + sb.a(cols[idx]).a(" > ? "); else - sb.a(cols[j]).a(" = ? AND "); - if (cnt != 1) + sb.a(cols[idx]).a(" = ? AND "); + } + + if (keyCnt != 1) sb.a("OR "); } @@ -196,6 +198,7 @@ private static String where(Collection keyCols, int keyCnt) { for (int idx = 0, lastIdx = keyCnt - 1; idx < keyCnt; idx++) { sb.a(cols[idx]); + // For composite key when not all of the key columns are constrained should use < (strictly less). if (idx == lastIdx) sb.a(keyCnt == keyCols.size() ? " <= ? " : " < ? "); else