diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java index 4a97aefcfcbe0..332bbbaf40925 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java @@ -20,8 +20,6 @@ import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; import java.util.Collections; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.QueryEntity; @@ -54,7 +52,7 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac static final String BASE_URL_BIN = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-bin-config.xml"; /** SQL SELECT query for verification. */ - private static final String SQL_SELECT = "select _key, id, firstName, lastName, age from Person"; + static final String SQL_SELECT = "select _key, id, firstName, lastName, age from Person"; /** Connection. */ protected Connection conn; @@ -149,51 +147,6 @@ protected String getCfgUrl() { /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - try (Statement selStmt = conn.createStatement()) { - assert selStmt.execute(SQL_SELECT); - - ResultSet rs = selStmt.getResultSet(); - - assert rs != null; - - while (rs.next()) { - int id = rs.getInt("id"); - - switch (id) { - case 1: - assertEquals("p1", rs.getString("_key")); - assertEquals("John", rs.getString("firstName")); - assertEquals("White", rs.getString("lastName")); - assertEquals(25, rs.getInt("age")); - break; - - case 2: - assertEquals("p2", rs.getString("_key")); - assertEquals("Joe", rs.getString("firstName")); - assertEquals("Black", rs.getString("lastName")); - assertEquals(35, rs.getInt("age")); - break; - - case 3: - assertEquals("p3", rs.getString("_key")); - assertEquals("Mike", rs.getString("firstName")); - assertEquals("Green", rs.getString("lastName")); - assertEquals(40, rs.getInt("age")); - break; - - case 4: - assertEquals("p4", rs.getString("_key")); - assertEquals("Leah", rs.getString("firstName")); - assertEquals("Grey", rs.getString("lastName")); - assertEquals(22, rs.getInt("age")); - break; - - default: - assert false : "Invalid ID: " + id; - } - } - } - grid(0).cache(null).clear(); assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL)); diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java index 7fc92de20595c..1bd6d34f27005 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.jdbc2; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; import java.util.HashSet; import java.util.concurrent.Callable; import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.testframework.GridTestUtils; /** @@ -61,6 +63,55 @@ public class JdbcInsertStatementSelfTest extends JdbcAbstractDmlStatementSelfTes /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { + try (Statement selStmt = conn.createStatement()) { + assertTrue(selStmt.execute(SQL_SELECT)); + + ResultSet rs = selStmt.getResultSet(); + + assert rs != null; + + while (rs.next()) { + int id = rs.getInt("id"); + + switch (id) { + case 1: + assertEquals("p1", rs.getString("_key")); + assertEquals("John", rs.getString("firstName")); + assertEquals("White", rs.getString("lastName")); + assertEquals(25, rs.getInt("age")); + break; + + case 2: + assertEquals("p2", rs.getString("_key")); + assertEquals("Joe", rs.getString("firstName")); + assertEquals("Black", rs.getString("lastName")); + assertEquals(35, rs.getInt("age")); + break; + + case 3: + assertEquals("p3", rs.getString("_key")); + assertEquals("Mike", rs.getString("firstName")); + assertEquals("Green", rs.getString("lastName")); + assertEquals(40, rs.getInt("age")); + break; + + case 4: + assertEquals("p4", rs.getString("_key")); + assertEquals("Leah", rs.getString("firstName")); + assertEquals("Grey", rs.getString("lastName")); + assertEquals(22, rs.getInt("age")); + break; + + default: + assert false : "Invalid ID: " + id; + } + } + } + + grid(0).cache(null).clear(); + + assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL)); + super.afterTest(); if (stmt != null && !stmt.isClosed()) diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java index ecf6032db745f..3c56c921bc78b 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.jdbc2; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import org.apache.ignite.cache.CachePeekMode; /** * MERGE statement test. @@ -56,6 +58,55 @@ public class JdbcMergeStatementSelfTest extends JdbcAbstractDmlStatementSelfTest /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { + try (Statement selStmt = conn.createStatement()) { + assertTrue(selStmt.execute(SQL_SELECT)); + + ResultSet rs = selStmt.getResultSet(); + + assert rs != null; + + while (rs.next()) { + int id = rs.getInt("id"); + + switch (id) { + case 1: + assertEquals("p1", rs.getString("_key")); + assertEquals("John", rs.getString("firstName")); + assertEquals("White", rs.getString("lastName")); + assertEquals(25, rs.getInt("age")); + break; + + case 2: + assertEquals("p2", rs.getString("_key")); + assertEquals("Joe", rs.getString("firstName")); + assertEquals("Black", rs.getString("lastName")); + assertEquals(35, rs.getInt("age")); + break; + + case 3: + assertEquals("p3", rs.getString("_key")); + assertEquals("Mike", rs.getString("firstName")); + assertEquals("Green", rs.getString("lastName")); + assertEquals(40, rs.getInt("age")); + break; + + case 4: + assertEquals("p4", rs.getString("_key")); + assertEquals("Leah", rs.getString("firstName")); + assertEquals("Grey", rs.getString("lastName")); + assertEquals(22, rs.getInt("age")); + break; + + default: + assert false : "Invalid ID: " + id; + } + } + } + + grid(0).cache(null).clear(); + + assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL)); + super.afterTest(); if (stmt != null && !stmt.isClosed()) diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java new file mode 100644 index 0000000000000..5e206eef2248b --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.Properties; +import org.apache.ignite.IgniteJdbcDriver; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Data streaming test. + */ +public class JdbcStreamingSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** JDBC URL. */ + private static final String BASE_URL = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-config.xml"; + + /** Connection. */ + protected Connection conn; + + /** */ + protected transient IgniteLogger log; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return getConfiguration0(gridName); + } + + /** + * @param gridName Grid name. + * @return Grid configuration used for starting the grid. + * @throws Exception If failed. + */ + private IgniteConfiguration getConfiguration0(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setIndexedTypes( + Integer.class, Integer.class + ); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setConnectorConfiguration(new ConnectorConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + Class.forName("org.apache.ignite.IgniteJdbcDriver"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @param allowOverwrite Allow overwriting of existing keys. + * @return Connection to use for the test. + * @throws Exception if failed. + */ + private Connection createConnection(boolean allowOverwrite) throws Exception { + Properties props = new Properties(); + + props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true"); + props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500"); + + if (allowOverwrite) + props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, "true"); + + return DriverManager.getConnection(BASE_URL, props); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + U.closeQuiet(conn); + + ignite(0).cache(null).clear(); + + super.afterTest(); + } + + /** + * @throws Exception if failed. + */ + public void testStreamedInsert() throws Exception { + conn = createConnection(false); + + ignite(0).cache(null).put(5, 500); + + PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)"); + + for (int i = 1; i <= 100000; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i); + + stmt.executeUpdate(); + } + + // Data is not there yet. + assertNull(grid(0).cache(null).get(100000)); + + // Let the stream flush. + U.sleep(1500); + + // Now let's check it's all there. + assertEquals(1, grid(0).cache(null).get(1)); + assertEquals(100000, grid(0).cache(null).get(100000)); + + // 5 should still point to 500. + assertEquals(500, grid(0).cache(null).get(5)); + } + + /** + * @throws Exception if failed. + */ + public void testStreamedInsertWithOverwritesAllowed() throws Exception { + conn = createConnection(true); + + ignite(0).cache(null).put(5, 500); + + PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)"); + + for (int i = 1; i <= 100000; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i); + + stmt.executeUpdate(); + } + + // Data is not there yet. + assertNull(grid(0).cache(null).get(100000)); + + // Let the stream flush. + U.sleep(1500); + + // Now let's check it's all there. + assertEquals(1, grid(0).cache(null).get(1)); + assertEquals(100000, grid(0).cache(null).get(100000)); + + // 5 should now point to 5 as we've turned overwriting on. + assertEquals(5, grid(0).cache(null).get(5)); + } +} diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java new file mode 100644 index 0000000000000..8ae0e906acd69 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.sql.SQLException; +import java.util.Arrays; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.util.typedef.F; + +/** + * + */ +public class JdbcUpdateStatementSelfTest extends JdbcAbstractUpdateStatementSelfTest { + /** + * + */ + public void testExecute() throws SQLException { + conn.createStatement().execute("update Person set firstName = 'Jack' where " + + "cast(substring(_key, 2, 1) as int) % 2 = 0"); + + assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")), + jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll()); + } + + /** + * + */ + public void testExecuteUpdate() throws SQLException { + conn.createStatement().executeUpdate("update Person set firstName = 'Jack' where " + + "cast(substring(_key, 2, 1) as int) % 2 = 0"); + + assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")), + jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll()); + } +} diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index c41b75458d72d..7395fcbc7d5ce 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -66,6 +66,7 @@ public static TestSuite suite() throws Exception { suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcInsertStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerInsertStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDeleteStatementSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class)); return suite; } diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java index d432c1e4f9fee..9790b8f9986d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java @@ -292,6 +292,21 @@ public class IgniteJdbcDriver implements Driver { /** Distributed joins parameter name. */ private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins"; + /** DML streaming parameter name. */ + private static final String PARAM_STREAMING = "streaming"; + + /** DML streaming auto flush frequency. */ + private static final String PARAM_STREAMING_FLUSH_FREQ = "streamingFlushFrequency"; + + /** DML streaming node buffer size. */ + private static final String PARAM_STREAMING_PER_NODE_BUF_SIZE = "streamingPerNodeBufferSize"; + + /** DML streaming parallel operations per node. */ + private static final String PARAM_STREAMING_PER_NODE_PAR_OPS = "streamingPerNodeParallelOperations"; + + /** Whether DML streaming will overwrite existing cache entries. */ + private static final String PARAM_STREAMING_ALLOW_OVERWRITE = "streamingAllowOverwrite"; + /** Hostname property name. */ public static final String PROP_HOST = PROP_PREFIX + "host"; @@ -313,6 +328,21 @@ public class IgniteJdbcDriver implements Driver { /** Distributed joins property name. */ public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS; + /** DML streaming property name. */ + public static final String PROP_STREAMING = PROP_PREFIX + PARAM_STREAMING; + + /** DML stream auto flush frequency property name. */ + public static final String PROP_STREAMING_FLUSH_FREQ = PROP_PREFIX + PARAM_STREAMING_FLUSH_FREQ; + + /** DML stream node buffer size property name. */ + public static final String PROP_STREAMING_PER_NODE_BUF_SIZE = PROP_PREFIX + PARAM_STREAMING_PER_NODE_BUF_SIZE; + + /** DML stream parallel operations per node property name. */ + public static final String PROP_STREAMING_PER_NODE_PAR_OPS = PROP_PREFIX + PARAM_STREAMING_PER_NODE_PAR_OPS; + + /** Whether DML streaming will overwrite existing cache entries. */ + public static final String PROP_STREAMING_ALLOW_OVERWRITE = PROP_PREFIX + PARAM_STREAMING_ALLOW_OVERWRITE; + /** Cache name property name. */ public static final String PROP_CFG = PROP_PREFIX + "cfg"; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index 5c4a147174b45..4244602b3c649 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -48,17 +48,21 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteJdbcDriver; import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.compute.ComputeTaskTimeoutException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.resources.IgniteInstanceResource; @@ -73,6 +77,11 @@ import static org.apache.ignite.IgniteJdbcDriver.PROP_DISTRIBUTED_JOINS; import static org.apache.ignite.IgniteJdbcDriver.PROP_LOCAL; import static org.apache.ignite.IgniteJdbcDriver.PROP_NODE_ID; +import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING; +import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE; +import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ; +import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_BUF_SIZE; +import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_PAR_OPS; /** * JDBC connection implementation. @@ -118,6 +127,21 @@ public class JdbcConnection implements Connection { /** Distributed joins flag. */ private boolean distributedJoins; + /** Make this connection streaming oriented, and prepared statements - data streamer aware. */ + private final boolean stream; + + /** Auto flush frequency for streaming. */ + private final long streamFlushTimeout; + + /** Node buffer size for data streamer. */ + private final int streamNodeBufSize; + + /** Parallel ops count per node for data streamer. */ + private final int streamNodeParOps; + + /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */ + private final boolean streamAllowOverwrite; + /** Statements. */ final Set statements = new HashSet<>(); @@ -139,6 +163,14 @@ public JdbcConnection(String url, Properties props) throws SQLException { this.collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED)); this.distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS)); + stream = Boolean.parseBoolean(props.getProperty(PROP_STREAMING)); + streamAllowOverwrite = Boolean.parseBoolean(props.getProperty(PROP_STREAMING_ALLOW_OVERWRITE)); + streamFlushTimeout = Long.parseLong(props.getProperty(PROP_STREAMING_FLUSH_FREQ, "0")); + streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE, + String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE))); + streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, + String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS))); + String nodeIdProp = props.getProperty(PROP_NODE_ID); if (nodeIdProp != null) @@ -291,6 +323,14 @@ private IgniteConfiguration loadConfiguration(String cfgUrl) { closed = true; + for (Iterator it = statements.iterator(); it.hasNext();) { + JdbcStatement stmt = it.next(); + + stmt.closeInternal(); + + it.remove(); + } + IgniteNodeFuture fut = NODES.get(cfg); if (fut != null && fut.release()) { @@ -299,14 +339,6 @@ private IgniteConfiguration loadConfiguration(String cfgUrl) { if (ignite != null) ignite.close(); } - - for (Iterator it = statements.iterator(); it.hasNext();) { - JdbcStatement stmt = it.next(); - - stmt.closeInternal(); - - it.remove(); - } } /** {@inheritDoc} */ @@ -487,7 +519,18 @@ private IgniteConfiguration loadConfiguration(String cfgUrl) { if (resSetHoldability != HOLD_CURSORS_OVER_COMMIT) throw new SQLFeatureNotSupportedException("Invalid holdability (transactions are not supported)."); - JdbcPreparedStatement stmt = new JdbcPreparedStatement(this, sql); + JdbcPreparedStatement stmt; + + if (!stream) + stmt = new JdbcPreparedStatement(this, sql); + else { + PreparedStatement nativeStmt = prepareNativeStatement(sql); + + IgniteDataStreamer streamer = ((IgniteEx) ignite).context().query().createStreamer(cacheName, + nativeStmt, streamFlushTimeout, streamNodeBufSize, streamNodeParOps, streamAllowOverwrite); + + stmt = new JdbcStreamedPreparedStatement(this, sql, streamer, nativeStmt); + } statements.add(stmt); @@ -646,12 +689,17 @@ private IgniteConfiguration loadConfiguration(String cfgUrl) { /** {@inheritDoc} */ @Override public void setSchema(String schema) throws SQLException { - cacheName = schema; + assert ignite instanceof IgniteEx; + + cacheName = ((IgniteEx)ignite).context().query().space(schema); } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public String getSchema() throws SQLException { - return cacheName; + String sqlSchema = ignite.cache(cacheName).getConfiguration(CacheConfiguration.class).getSqlSchema(); + + return U.firstNotNull(sqlSchema, cacheName, ""); } /** {@inheritDoc} */ @@ -749,7 +797,7 @@ JdbcStatement createStatement0() throws SQLException { */ PreparedStatement prepareNativeStatement(String sql) throws SQLException { return ((IgniteCacheProxy) ignite().cache(cacheName())).context() - .kernalContext().query().prepareNativeStatement(cacheName(), sql); + .kernalContext().query().prepareNativeStatement(getSchema(), sql); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java index 57badd2df728e..54e58e9900a2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java @@ -17,12 +17,28 @@ package org.apache.ignite.internal.jdbc2; -import java.io.*; -import java.math.*; -import java.net.*; -import java.sql.*; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; import java.sql.Date; -import java.util.*; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Calendar; /** * JDBC prepared statement implementation. @@ -31,10 +47,8 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat /** SQL query. */ private final String sql; - /** - * H2's parsed statement to retrieve metadata from. - */ - private PreparedStatement nativeStatement; + /** H2's parsed statement to retrieve metadata from. */ + PreparedStatement nativeStatement; /** * Creates new prepared statement. @@ -55,8 +69,6 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement."); } - - /** {@inheritDoc} */ @Override public ResultSet executeQuery() throws SQLException { ensureNotClosed(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java index d7e387f91f571..44db3757eee40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java @@ -140,7 +140,7 @@ public class JdbcStatement implements Statement { updateCnt = -1; - return doUpdate(sql, getArgs()); + return Long.valueOf(doUpdate(sql, getArgs())).intValue(); } /** @@ -148,9 +148,9 @@ public class JdbcStatement implements Statement { * @param sql SQL query. * @param args Update arguments. * @return Number of affected items. - * @throws SQLException + * @throws SQLException If failed. */ - int doUpdate(String sql, Object[] args) throws SQLException { + long doUpdate(String sql, Object[] args) throws SQLException { if (F.isEmpty(sql)) throw new SQLException("SQL query is empty"); @@ -172,11 +172,7 @@ int doUpdate(String sql, Object[] args) throws SQLException { JdbcQueryTaskV2.QueryResult qryRes = loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask); - Long res = updateCounterFromQueryResult(qryRes.getRows()); - - updateCnt = res; - - return res.intValue(); + return updateCnt = updateCounterFromQueryResult(qryRes.getRows()); } catch (IgniteSQLException e) { throw e.toJdbcException(); @@ -194,12 +190,12 @@ int doUpdate(String sql, Object[] args) throws SQLException { * @return update counter, if found * @throws SQLException if getting an update counter from result proved to be impossible. */ - private static Long updateCounterFromQueryResult(List> rows) throws SQLException { + private static long updateCounterFromQueryResult(List> rows) throws SQLException { if (F.isEmpty(rows)) - return 0L; + return -1; if (rows.size() != 1) - throw new SQLException("Expected number of rows of 1 for update operation"); + throw new SQLException("Expected fetch size of 1 for update operation"); List row = rows.get(0); @@ -211,7 +207,7 @@ private static Long updateCounterFromQueryResult(List> rows) throws SQLE if (!(objRes instanceof Long)) throw new SQLException("Unexpected update result type"); - return (Long) objRes; + return (Long)objRes; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java new file mode 100644 index 0000000000000..019923f921351 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.internal.IgniteEx; + +/** + * Prepared statement associated with a data streamer. + */ +class JdbcStreamedPreparedStatement extends JdbcPreparedStatement { + /** */ + private final IgniteDataStreamer streamer; + + /** + * Creates new prepared statement. + * + * @param conn Connection. + * @param sql SQL query. + * @param streamer Data streamer to use with this statement. Will be closed on statement close. + */ + JdbcStreamedPreparedStatement(JdbcConnection conn, String sql, IgniteDataStreamer streamer, + PreparedStatement nativeStmt) { + super(conn, sql); + + this.streamer = streamer; + + nativeStatement = nativeStmt; + } + + /** {@inheritDoc} */ + @Override void closeInternal() throws SQLException { + streamer.close(false); + + super.closeInternal(); + } + + /** {@inheritDoc} */ + @Override long doUpdate(String sql, Object[] args) throws SQLException { + return ((IgniteEx)conn.ignite()).context().query().streamUpdateQuery(conn.cacheName(), streamer, sql, args); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index ca047244a97f9..2abb3a952f3f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -23,6 +23,7 @@ import java.util.List; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; @@ -98,6 +99,19 @@ public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, Str Collection params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout, GridQueryCancel cancel) throws IgniteCheckedException; + /** + * Perform a MERGE statement using data streamer as receiver. + * + * @param spaceName Space name. + * @param qry Query. + * @param params Query parameters. + * @param streamer Data streamer to feed data to. + * @return Query result. + * @throws IgniteCheckedException If failed. + */ + public long streamUpdateQuery(@Nullable final String spaceName, final String qry, + @Nullable final Object[] params, IgniteDataStreamer streamer) throws IgniteCheckedException; + /** * Executes regular query. * @@ -240,6 +254,14 @@ public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Cach */ public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException; + /** + * Gets space name from database schema. + * + * @param schemaName Schema name. Could not be null. Could be empty. + * @return Space name. Could be null. + */ + public String space(String schemaName); + /** * Collect queries that already running more than specified duration. * @@ -259,4 +281,17 @@ public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Cach * Cancels all executing queries. */ public void cancelAllQueries(); + + /** + * @param spaceName Space name. + * @param nativeStmt Native statement. + * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}. + * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)} + * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)} + * @param allowOverwrite Overwrite existing cache values on key duplication. + * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata; + * {@code null} if given statement is a query. + */ + public IgniteDataStreamer createStreamer(String spaceName, PreparedStatement nativeStmt, long autoFlushFreq, + int nodeBufSize, int nodeParOps, boolean allowOverwrite); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index ee9224b7c56e8..c6d8270d68528 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -17,12 +17,11 @@ package org.apache.ignite.internal.processors.query; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.concurrent.TimeUnit; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; @@ -40,9 +39,11 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryField; import org.apache.ignite.binary.BinaryObject; @@ -817,6 +818,36 @@ public QueryCursor> queryTwoStep(final GridCacheContext cctx, final } } + /** + * @param spaceName Cache name. + * @param streamer Data streamer. + * @param qry Query. + * @return Iterator. + */ + public long streamUpdateQuery(@Nullable final String spaceName, + final IgniteDataStreamer streamer, final String qry, final Object[] args) { + assert streamer != null; + + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + GridCacheContext cctx = ctx.cache().cache(spaceName).context(); + + return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX() { + @Override public Long applyx() throws IgniteCheckedException { + return idx.streamUpdateQuery(spaceName, qry, args, streamer); + } + }, true); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + finally { + busyLock.leaveBusy(); + } + } + /** * @param cctx Cache context. * @param qry Query. @@ -964,7 +995,6 @@ private void sendQueryExecutedEvent(String sqlQry, Object[] params) { } /** - * * @param schema Schema. * @param sql Query. * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2. @@ -975,6 +1005,31 @@ public PreparedStatement prepareNativeStatement(String schema, String sql) throw return idx.prepareNativeStatement(schema, sql); } + /** + * @param schema Schema name. + * @return space (cache) name from schema name. + */ + public String space(String schema) throws SQLException { + checkxEnabled(); + + return idx.space(schema); + } + + /** + * @param spaceName Space name. + * @param nativeStmt Native statement. + * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}. + * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)} + * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)} + * @param allowOverwrite Overwrite existing cache values on key duplication. + * @see IgniteDataStreamer#allowOverwrite + * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata. + */ + public IgniteDataStreamer createStreamer(String spaceName, PreparedStatement nativeStmt, long autoFlushFreq, + int nodeBufSize, int nodeParOps, boolean allowOverwrite) { + return idx.createStreamer(spaceName, nativeStmt, autoFlushFreq, nodeBufSize, nodeParOps, allowOverwrite); + } + /** * @param timeout Timeout. * @param timeUnit Time unit. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index 40307581b93e1..78c5bbcca72da 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -21,7 +21,6 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -40,6 +39,7 @@ import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.binary.BinaryArrayIdentityResolver; import org.apache.ignite.binary.BinaryObject; @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments; import org.apache.ignite.internal.processors.query.h2.dml.KeyValueSupplier; +import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan; import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; @@ -126,7 +127,7 @@ public class DmlStatementsProcessor { * @return Update result (modified items count and failed keys). * @throws IgniteCheckedException if failed. */ - private long updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, + private UpdateResult updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { Object[] errKeys = null; @@ -156,23 +157,27 @@ else if (!opCtx.isKeepBinary()) UpdateResult r; try { - r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters, - cancel, errKeys); + r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters, cancel, errKeys); } finally { cctx.operationContextPerCall(opCtx); } - if (F.isEmpty(r.errKeys)) - return r.cnt + items; - else { - items += r.cnt; - errKeys = r.errKeys; - } + items += r.cnt; + errKeys = r.errKeys; + + if (F.isEmpty(errKeys)) + break; } - throw new IgniteSQLException("Failed to update or delete some keys: " + Arrays.deepToString(errKeys), - IgniteQueryErrorCode.CONCURRENT_UPDATE); + if (F.isEmpty(errKeys)) { + if (items == 1L) + return UpdateResult.ONE; + else if (items == 0L) + return UpdateResult.ZERO; + } + + return new UpdateResult(items, errKeys); } /** @@ -186,9 +191,14 @@ else if (!opCtx.isKeepBinary()) @SuppressWarnings("unchecked") QueryCursorImpl> updateSqlFieldsTwoStep(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException { - long res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel); + UpdateResult res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel); + + QueryCursorImpl> resCur = (QueryCursorImpl>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(res.cnt)), null, false); - return cursorForUpdateResult(res); + resCur.fieldsMeta(UPDATE_RESULT_META); + + return resCur; } /** @@ -203,10 +213,95 @@ QueryCursorImpl> updateSqlFieldsTwoStep(String spaceName, PreparedStatem @SuppressWarnings("unchecked") GridQueryFieldsResult updateLocalSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { - long res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel); + UpdateResult res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel); return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, - new IgniteSingletonIterator(Collections.singletonList(res))); + new IgniteSingletonIterator(Collections.singletonList(res.cnt))); + } + + /** + * Perform given statement against given data streamer. Only rows based INSERT and MERGE are supported + * as well as key bound UPDATE and DELETE (ones with filter {@code WHERE _key = ?}). + * + * @param streamer Streamer to feed data to. + * @param stmt Statement. + * @param args Statement arguments. + * @return Number of rows in given statement for INSERT and MERGE, {@code 1} otherwise. + * @throws IgniteCheckedException if failed. + */ + @SuppressWarnings({"unchecked", "ConstantConditions"}) + long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Object[] args) + throws IgniteCheckedException { + args = U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY); + + Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt); + + assert p != null; + + UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, null); + + if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().namex())) + throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" + + " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + if (plan.mode == UpdateMode.INSERT && plan.rowsNum > 0) { + assert plan.isLocSubqry; + + final GridCacheContext cctx = plan.tbl.rowDescriptor().context(); + + QueryCursorImpl> cur; + + final ArrayList> data = new ArrayList<>(plan.rowsNum); + + final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, + F.asList(args), null, false, 0, null); + + QueryCursorImpl> stepCur = new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + try { + return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }, null); + + data.addAll(stepCur.getAll()); + + cur = new QueryCursorImpl<>(new Iterable>() { + @Override public Iterator> iterator() { + return data.iterator(); + } + }, null); + + GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); + + if (plan.rowsNum == 1) { + IgniteBiTuple t = rowToKeyValue(cctx, cur.iterator().next().toArray(), plan.colNames, plan.colTypes, + plan.keySupplier, plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc); + + streamer.addData(t.getKey(), t.getValue()); + + return 1; + } + + Map rows = new LinkedHashMap<>(plan.rowsNum); + + for (List row : cur) { + final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.colTypes, + plan.keySupplier, plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc); + + rows.put(t.getKey(), t.getValue()); + } + + streamer.addData(rows); + + return rows.size(); + } + else + throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); } /** @@ -214,31 +309,21 @@ GridQueryFieldsResult updateLocalSqlFields(String spaceName, PreparedStatement s * @param cctx Cache context. * @param prepStmt Prepared statement for DML query. * @param filters Space name and key filter. - * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction. - * @return Pair [number of successfully processed items; keys that have failed to be processed] + * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction. @return Pair [number of successfully processed items; keys that have failed to be processed] * @throws IgniteCheckedException if failed. */ - @SuppressWarnings("ConstantConditions") + @SuppressWarnings({"ConstantConditions", "unchecked"}) private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt, - SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys) - throws IgniteCheckedException { + SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, + Object[] failedKeys) throws IgniteCheckedException { Integer errKeysPos = null; - Object[] params = fieldsQry.getArgs(); - - if (!F.isEmpty(failedKeys)) { - int paramsCnt = F.isEmpty(params) ? 0 : params.length; - params = Arrays.copyOf(U.firstNotNull(params, X.EMPTY_OBJECT_ARRAY), paramsCnt + 1); - params[paramsCnt] = failedKeys; - errKeysPos = paramsCnt; // Last position - } - UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos); if (plan.fastUpdateArgs != null) { assert F.isEmpty(failedKeys) && errKeysPos == null; - return new UpdateResult(doSingleUpdate(plan, params), X.EMPTY_OBJECT_ARRAY); + return doFastUpdate(plan, fieldsQry.getArgs()); } assert !F.isEmpty(plan.selectQry); @@ -249,7 +334,7 @@ private UpdateResult executeUpdateStatement(final GridCacheContext cctx, Prepare // subquery and not some dummy stuff like "select 1, 2, 3;" if (!loc && !plan.isLocSubqry) { SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated()) - .setArgs(params) + .setArgs(fieldsQry.getArgs()) .setDistributedJoins(fieldsQry.isDistributedJoins()) .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()) .setLocal(fieldsQry.isLocal()) @@ -259,8 +344,8 @@ private UpdateResult executeUpdateStatement(final GridCacheContext cctx, Prepare cur = (QueryCursorImpl>) indexing.queryTwoStep(cctx, newFieldsQry, cancel); } else { - final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params), - filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); + final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, + F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel); cur = new QueryCursorImpl<>(new Iterable>() { @Override public Iterator> iterator() { @@ -272,8 +357,6 @@ private UpdateResult executeUpdateStatement(final GridCacheContext cctx, Prepare } } }, cancel); - - cur.fieldsMeta(res.metaData()); } int pageSize = loc ? 0 : fieldsQry.getPageSize(); @@ -337,38 +420,41 @@ private UpdatePlan getPlanForStatement(String spaceName, PreparedStatement prepS /** * Perform single cache operation based on given args. - * @param params Query parameters. + * @param args Query parameters. * @return 1 if an item was affected, 0 otherwise. * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private static long doSingleUpdate(UpdatePlan plan, Object[] params) throws IgniteCheckedException { + private static UpdateResult doFastUpdate(UpdatePlan plan, Object[] args) throws IgniteCheckedException { GridCacheContext cctx = plan.tbl.rowDescriptor().context(); FastUpdateArguments singleUpdate = plan.fastUpdateArgs; assert singleUpdate != null; - int res; + boolean valBounded = (singleUpdate.val != FastUpdateArguments.NULL_ARGUMENT); - Object key = singleUpdate.key.apply(params); - Object val = singleUpdate.val.apply(params); - Object newVal = singleUpdate.newVal.apply(params); + if (singleUpdate.newVal != FastUpdateArguments.NULL_ARGUMENT) { // Single item UPDATE + Object key = singleUpdate.key.apply(args); + Object newVal = singleUpdate.newVal.apply(args); - if (newVal != null) { // Single item UPDATE - if (val == null) // No _val bound in source query - res = cctx.cache().replace(key, newVal) ? 1 : 0; + if (valBounded) { + Object val = singleUpdate.val.apply(args); + + return (cctx.cache().replace(key, val, newVal) ? UpdateResult.ONE : UpdateResult.ZERO); + } else - res = cctx.cache().replace(key, val, newVal) ? 1 : 0; + return (cctx.cache().replace(key, newVal) ? UpdateResult.ONE : UpdateResult.ZERO); } else { // Single item DELETE - if (val == null) // No _val bound in source query - res = cctx.cache().remove(key) ? 1 : 0; + Object key = singleUpdate.key.apply(args); + Object val = singleUpdate.val.apply(args); + + if (singleUpdate.val == FastUpdateArguments.NULL_ARGUMENT) // No _val bound in source query + return cctx.cache().remove(key) ? UpdateResult.ONE : UpdateResult.ZERO; else - res = cctx.cache().remove(key, val) ? 1 : 0; + return cctx.cache().remove(key, val) ? UpdateResult.ONE : UpdateResult.ZERO; } - - return res; } /** @@ -379,7 +465,7 @@ private static long doSingleUpdate(UpdatePlan plan, Object[] params) throws Igni * @return Results of DELETE (number of items affected AND keys that failed to be updated). */ @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"}) - private UpdateResult doDelete(GridCacheContext cctx, QueryCursorImpl> cursor, int pageSize) + private UpdateResult doDelete(GridCacheContext cctx, Iterable> cursor, int pageSize) throws IgniteCheckedException { // With DELETE, we have only two columns - key and value. long res = 0; @@ -449,7 +535,7 @@ private UpdateResult doDelete(GridCacheContext cctx, QueryCursorImpl> cu * had been modified concurrently (arguments for a re-run)]. */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - private UpdateResult doUpdate(UpdatePlan plan, QueryCursorImpl> cursor, int pageSize) + private UpdateResult doUpdate(UpdatePlan plan, Iterable> cursor, int pageSize) throws IgniteCheckedException { GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); @@ -575,7 +661,6 @@ private UpdateResult doUpdate(UpdatePlan plan, QueryCursorImpl> cursor, throw new IgniteSQLException(resEx); } - return new UpdateResult(res, failedKeys.toArray()); } @@ -689,7 +774,7 @@ private static PageProcessingErrorResult splitErrors(Map> cursor, int pageSize) throws IgniteCheckedException { + private long doMerge(UpdatePlan plan, Iterable> cursor, int pageSize) throws IgniteCheckedException { GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); GridCacheContext cctx = desc.context(); @@ -735,7 +820,7 @@ private long doMerge(UpdatePlan plan, QueryCursorImpl> cursor, int pageS * @throws IgniteCheckedException if failed, particularly in case of duplicate keys. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private long doInsert(UpdatePlan plan, QueryCursorImpl> cursor, int pageSize) throws IgniteCheckedException { + private long doInsert(UpdatePlan plan, Iterable> cursor, int pageSize) throws IgniteCheckedException { GridH2RowDescriptor desc = plan.tbl.rowDescriptor(); GridCacheContext cctx = desc.context(); @@ -999,24 +1084,14 @@ private EntryValueUpdater(Object val) { } } - /** - * Wrap result of DML operation (number of items affected) to Iterable suitable to be wrapped by cursor. - * - * @param itemsCnt Update result to wrap. - * @return Resulting Iterable. - */ - @SuppressWarnings("unchecked") - private static QueryCursorImpl> cursorForUpdateResult(long itemsCnt) { - QueryCursorImpl> res = - new QueryCursorImpl(Collections.singletonList(Collections.singletonList(itemsCnt)), null, false); - - res.fieldsMeta(UPDATE_RESULT_META); - - return res; - } - /** Update result - modifications count and keys to re-run query with, if needed. */ private final static class UpdateResult { + /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */ + final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY); + + /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */ + final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY); + /** Number of processed items. */ final long cnt; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index e4b0c1feb7a21..8088f805ff2be 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -56,6 +56,7 @@ import javax.cache.Cache; import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheMemoryMode; @@ -82,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; @@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryProperty; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine; import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap; @@ -136,6 +137,7 @@ import org.h2.api.JavaObjectSerializer; import org.h2.command.CommandInterface; import org.h2.command.Prepared; +import org.h2.command.dml.Insert; import org.h2.engine.Session; import org.h2.engine.SysProperties; import org.h2.index.Index; @@ -440,7 +442,32 @@ private PreparedStatement prepareStatement(Connection c, String sql, boolean use /** {@inheritDoc} */ @Override public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException { - return prepareStatement(connectionForSpace(schema), sql, false); + return prepareStatement(connectionForSpace(space(schema)), sql, true); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public IgniteDataStreamer createStreamer(String spaceName, PreparedStatement nativeStmt, + long autoFlushFreq, int nodeBufSize, int nodeParOps, boolean allowOverwrite) { + Prepared prep = GridSqlQueryParser.prepared((JdbcPreparedStatement) nativeStmt); + + if (!(prep instanceof Insert)) + throw new IgniteSQLException("Only INSERT operations are supported in streaming mode", + IgniteQueryErrorCode.UNSUPPORTED_OPERATION); + + IgniteDataStreamer streamer = ctx.grid().dataStreamer(spaceName); + + streamer.autoFlushFrequency(autoFlushFreq); + + streamer.allowOverwrite(allowOverwrite); + + if (nodeBufSize > 0) + streamer.perNodeBufferSize(nodeBufSize); + + if (nodeParOps > 0) + streamer.perNodeParallelOperations(nodeParOps); + + return streamer; } /** @@ -873,6 +900,23 @@ private void removeTable(TableDescriptor tbl) throws IgniteCheckedException { }; } + /** {@inheritDoc} */ + @Override public long streamUpdateQuery(@Nullable String spaceName, String qry, + @Nullable Object[] params, IgniteDataStreamer streamer) throws IgniteCheckedException { + final Connection conn = connectionForSpace(spaceName); + + final PreparedStatement stmt; + + try { + stmt = prepareStatement(conn, qry, true); + } + catch (SQLException e) { + throw new IgniteSQLException(e); + } + + return dmlProc.streamUpdateQuery(streamer, stmt, params); + } + /** * @param rsMeta Metadata. * @return List of fields metadata. @@ -1680,13 +1724,8 @@ private void cleanupStatementCache() { } } - /** - * Gets space name from database schema. - * - * @param schemaName Schema name. Could not be null. Could be empty. - * @return Space name. Could be null. - */ - public String space(String schemaName) { + /** {@inheritDoc} */ + @Override public String space(String schemaName) { assert schemaName != null; Schema schema = schemas.get(schemaName);