From a06a55750c42649acba4a27697809dde463542f1 Mon Sep 17 00:00:00 2001 From: "S.Vladykin" Date: Tue, 17 Mar 2015 05:23:11 +0300 Subject: [PATCH] ignite-sql-tests - jdbc --- .../ignite/jdbc/JdbcEmptyCacheSelfTest.java | 3 + .../processors/cache/IgniteCacheProxy.java | 4 +- .../processors/cache/QueryCursorImpl.java | 18 +++ .../query/jdbc/GridCacheQueryJdbcTask.java | 5 +- .../processors/query/GridQueryProcessor.java | 14 +- .../processors/query/h2/IgniteH2Indexing.java | 143 +++++++----------- .../query/h2/sql/GridSqlQueryParser.java | 33 +++- .../query/h2/sql/GridSqlQuerySplitter.java | 19 +-- .../h2/twostep/GridReduceQueryExecutor.java | 47 +++--- 9 files changed, 150 insertions(+), 136 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java index 9742999c35386..3869dddb53d39 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java @@ -54,6 +54,9 @@ public class JdbcEmptyCacheSelfTest extends GridCommonAbstractTest { cache.setCacheMode(PARTITIONED); cache.setBackups(1); cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setIndexedTypes( + Byte.class, Byte.class + ); cfg.setCacheConfiguration(cache); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index aaa63fd7ddcc7..3216ccc59525a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -489,8 +489,8 @@ private QueryCursor> doLocalQuery(SqlQuery p) { * @return Cursor. */ private QueryCursor> doLocalFieldsQuery(SqlFieldsQuery q) { - return new QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields( - ctx.name(), q.getSql(), q.getArgs())); + return ctx.kernalContext().query().queryLocalFields( + ctx.name(), q.getSql(), q.getArgs()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java index 62e73763c002d..7cb9efc9d4ce0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java @@ -19,6 +19,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.query.*; import java.util.*; @@ -32,6 +33,9 @@ public class QueryCursorImpl implements QueryCursorEx { /** */ private boolean iterTaken; + /** */ + private Collection fieldsMeta; + /** * @param iter Iterator. */ @@ -95,4 +99,18 @@ public QueryCursorImpl(Iterator iter) { } } } + + /** + * @param fieldsMeta SQL Fields query result metadata. + */ + public void fieldsMeta(Collection fieldsMeta) { + this.fieldsMeta = fieldsMeta; + } + + /** + * @return SQL Fields query result metadata. + */ + public Collection fieldsMeta() { + return fieldsMeta; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java index b53a9e16a534b..332c649d993ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java @@ -21,6 +21,7 @@ import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -194,7 +195,9 @@ private static class JdbcDriverJob extends ComputeJobAdapter { QueryCursor> cursor = cache.queryFields(qry); - Collection meta = null; // TODO + Collection meta = ((QueryCursorImpl>)cursor).fieldsMeta(); + + assert meta != null; tbls = new ArrayList<>(meta.size()); cols = new ArrayList<>(meta.size()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 11a9f2c6d6b66..aa924c4457f9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; import org.apache.ignite.lang.*; -import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -555,13 +554,12 @@ private static interface ClIter extends AutoCloseable, Iterator { * @param args Arguments. * @return Iterator. */ - public Iterator> queryLocalFields(String space, String sql, Object[] args) { + public QueryCursor> queryLocalFields(String space, String sql, Object[] args) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - IgniteSpiCloseableIterator> iterator = - idx.queryFields(space, sql, F.asList(args), idx.backupFilter()).iterator(); + GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter()); if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -579,10 +577,14 @@ public Iterator> queryLocalFields(String space, String sql, Object[] arg null)); } - return iterator; + QueryCursorImpl> cursor = new QueryCursorImpl<>(res.iterator()); + + cursor.fieldsMeta(res.metaData()); + + return cursor; } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw new CacheException(e); } finally { busyLock.leaveBusy(); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index c3a3da388fd3d..1e431db92f61e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -65,7 +65,6 @@ import java.text.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.locks.*; import static org.apache.ignite.IgniteSystemProperties.*; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.*; @@ -496,18 +495,7 @@ private void removeTable(TableDescriptor tbl) throws IgniteCheckedException { if (rs != null) { try { - ResultSetMetaData rsMeta = rs.getMetaData(); - - meta = new ArrayList<>(rsMeta.getColumnCount()); - - for (int i = 1; i <= rsMeta.getColumnCount(); i++) { - String schemaName = rsMeta.getSchemaName(i); - String typeName = rsMeta.getTableName(i); - String name = rsMeta.getColumnLabel(i); - String type = rsMeta.getColumnClassName(i); - - meta.add(new SqlFieldMetadata(schemaName, typeName, name, type)); - } + meta = meta(rs.getMetaData()); } catch (SQLException e) { throw new IgniteSpiException("Failed to get meta data.", e); @@ -521,6 +509,26 @@ private void removeTable(TableDescriptor tbl) throws IgniteCheckedException { } } + /** + * @param rsMeta Metadata. + * @return List of fields metadata. + * @throws SQLException If failed. + */ + private static List meta(ResultSetMetaData rsMeta) throws SQLException { + ArrayList meta = new ArrayList<>(rsMeta.getColumnCount()); + + for (int i = 1; i <= rsMeta.getColumnCount(); i++) { + String schemaName = rsMeta.getSchemaName(i); + String typeName = rsMeta.getTableName(i); + String name = rsMeta.getColumnLabel(i); + String type = rsMeta.getColumnClassName(i); + + meta.add(new SqlFieldMetadata(schemaName, typeName, name, type)); + } + + return meta; + } + /** * @param stmt Prepared statement. * @return Command type. @@ -739,12 +747,38 @@ public void bindParameters(PreparedStatement stmt, @Nullable Collection @Override public QueryCursor> queryTwoStep(String space, String sqlQry, Object[] params) { Connection c = connectionForSpace(space); - GridCacheTwoStepQuery twoStepQry = GridSqlQuerySplitter.split(c, sqlQry, params); + PreparedStatement stmt; + + try { + stmt = c.prepareStatement(sqlQry); + } + catch (SQLException e) { + throw new CacheException("Failed to parse query: " + sqlQry, e); + } + + GridCacheTwoStepQuery twoStepQry; + Collection meta; + + try { + twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, params); + + meta = meta(stmt.getMetaData()); + } + catch (SQLException e) { + throw new CacheException(e); + } + finally { + U.close(stmt, log); + } if (log.isDebugEnabled()) log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry); - return queryTwoStep(space, twoStepQry); + QueryCursorImpl> cursor = (QueryCursorImpl>)queryTwoStep(space, twoStepQry); + + cursor.fieldsMeta(meta); + + return cursor; } /** @@ -1049,7 +1083,7 @@ public GridReduceQueryExecutor reduceQueryExecutor() { if (Utils.serializer != null) U.warn(log, "Custom H2 serialization is already configured, will override."); - Utils.serializer = h2Serializer(ctx != null && ctx.deploy().enabled()); + Utils.serializer = h2Serializer(); String dbName = (ctx != null ? ctx.localNodeId() : UUID.randomUUID()).toString(); @@ -1097,83 +1131,10 @@ public GridReduceQueryExecutor reduceQueryExecutor() { } /** - * @param p2pEnabled If peer-deployment is enabled. * @return Serializer. */ - protected JavaObjectSerializer h2Serializer(boolean p2pEnabled) { - return p2pEnabled ? - new JavaObjectSerializer() { - /** */ - private volatile Map ldr2id = Collections.emptyMap(); - - /** */ - private volatile Map id2ldr = Collections.emptyMap(); - - /** */ - private byte ldrIdGen = Byte.MIN_VALUE; - - /** */ - private final Lock lock = new ReentrantLock(); - - @Override public byte[] serialize(Object obj) throws Exception { - ClassLoader ldr = obj.getClass().getClassLoader(); - - Byte ldrId = ldr2id.get(ldr); - - if (ldrId == null) { - lock.lock(); - - try { - ldrId = ldr2id.get(ldr); - - if (ldrId == null) { - ldrId = ldrIdGen++; - - if (id2ldr.containsKey(ldrId)) // Overflow. - throw new IgniteException("Failed to add new peer-to-peer class loader."); - - Map id2ldr0 = new HashMap<>(id2ldr); - Map ldr2id0 = new IdentityHashMap<>(ldr2id); - - id2ldr0.put(ldrId, ldr); - ldr2id0.put(ldr, ldrId); - - ldr2id = ldr2id0; - id2ldr = id2ldr0; - } - } - finally { - lock.unlock(); - } - } - - byte[] bytes = marshaller.marshal(obj); - - int len = bytes.length; - - bytes = Arrays.copyOf(bytes, len + 1); // The last byte is for ldrId. - - bytes[len] = ldrId; - - return bytes; - } - - @Override public Object deserialize(byte[] bytes) throws Exception { - int last = bytes.length - 1; - - byte ldrId = bytes[last]; - - ClassLoader ldr = id2ldr.get(ldrId); - - if (ldr == null) - throw new IllegalStateException("Class loader was not found: " + ldrId); - - bytes = Arrays.copyOf(bytes, last); // Trim the last byte. - - return marshaller.unmarshal(bytes, ldr); - } - } : - new JavaObjectSerializer() { + protected JavaObjectSerializer h2Serializer() { + return new JavaObjectSerializer() { @Override public byte[] serialize(Object obj) throws Exception { return marshaller.marshal(obj); } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index a8c83d6fe265d..2e2f9c31cb2f6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.sql; import org.apache.ignite.*; +import org.h2.command.*; import org.h2.command.dml.*; import org.h2.engine.*; import org.h2.expression.*; @@ -28,7 +29,6 @@ import org.jetbrains.annotations.*; import java.lang.reflect.*; -import java.sql.*; import java.util.*; import java.util.Set; @@ -150,18 +150,35 @@ public class GridSqlQueryParser { /** */ private static final Getter FUNC_ALIAS = getter(JavaFunction.class, "functionAlias"); + /** */ + private static final Getter COMMAND = getter(JdbcPreparedStatement.class, "command"); + + /** */ + private static volatile Getter prepared; + /** */ private final IdentityHashMap h2ObjToGridObj = new IdentityHashMap<>(); /** - * @param conn Connection. - * @param select Select query. - * @return Parsed select query. + * @param stmt Prepared statement. + * @return Parsed select. */ - public static GridSqlSelect parse(Connection conn, String select) { - Session ses = (Session)((JdbcConnection)conn).getSession(); + public static GridSqlSelect parse(JdbcPreparedStatement stmt) { + Command cmd = COMMAND.get(stmt); + + Getter p = prepared; + + if (p == null) { + Class cls = cmd.getClass(); + + assert cls.getSimpleName().equals("CommandContainer"); + + prepared = p = getter(cls, "prepared"); + } + + Prepared select = p.get(cmd); - return new GridSqlQueryParser().parse((Select)ses.prepare(select)); + return new GridSqlQueryParser().parse((Select)select); } /** @@ -510,7 +527,7 @@ private static void assert0(boolean cond, Object o) { * @param cls Class. * @param fldName Fld name. */ - private static Getter getter(Class cls, String fldName) { + private static Getter getter(Class cls, String fldName) { Field field; try { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 019ed59221d15..47e5e05d6e152 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -19,9 +19,9 @@ import org.apache.ignite.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.h2.jdbc.*; import org.h2.value.*; -import java.sql.*; import java.util.*; import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.*; @@ -56,16 +56,15 @@ private static String columnName(int idx) { } /** - * @param conn Connection. - * @param query Query. + * @param stmt Prepared statement. * @param params Parameters. * @return Two step query. */ - public static GridCacheTwoStepQuery split(Connection conn, String query, Object[] params) { + public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params) { if (params == null) params = GridCacheSqlQuery.EMPTY_PARAMS; - GridSqlSelect srcQry = GridSqlQueryParser.parse(conn, query); + GridSqlSelect srcQry = GridSqlQueryParser.parse(stmt); final String mergeTable = TABLE_FUNC_NAME + "()"; // table(0); TODO @@ -299,16 +298,14 @@ private static void splitSelectExpression(List mapSelect, GridSq String mapColAlias = columnName(idx); String rdcColAlias; - if (alias == null) { // Wrap map column with generated alias if none. + if (alias == null) // Original column name for reduce column. rdcColAlias = el instanceof GridSqlColumn ? ((GridSqlColumn)el).columnName() : mapColAlias; - - alias = alias(mapColAlias, el); // `el` is known not to be alias. - - mapSelect.set(idx, alias); - } else // Set initial alias for reduce column. rdcColAlias = alias.alias(); + // Always wrap map column into generated alias. + mapSelect.set(idx, alias(mapColAlias, el)); // `el` is known not to be an alias. + if (idx < rdcSelect.length) { // SELECT __C0 AS orginal_alias GridSqlElement rdcEl = column(mapColAlias); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index f3d6bfc081190..4c1dde79c8139 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -200,24 +200,37 @@ private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) { GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); - idx.addPage(new GridResultPage(node.id(), msg, false) { - @Override public void fetchNextPage() { - if (r.rmtErr != null) - throw new CacheException("Next page fetch failed.", r.rmtErr); - - try { - GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); - - if (node.isLocal()) - h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); - else - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - throw new CacheException(e); + GridResultPage page; + + try { + page = new GridResultPage(node.id(), msg, false) { + @Override public void fetchNextPage() { + if (r.rmtErr != null) + throw new CacheException("Next page fetch failed.", r.rmtErr); + + try { + GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); + + if (node.isLocal()) + h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); + else + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } } - } - }); + }; + } + catch (Exception e) { + U.error(log, "Error in message.", e); + + fail(r, node.id(), "Error in message."); + + return; + } + + idx.addPage(page); if (msg.allRows() != -1) // Only the first page contains row count. r.latch.countDown();