Skip to content

Commit

Permalink
ignite-sql-tests - jdbc
Browse files Browse the repository at this point in the history
  • Loading branch information
S.Vladykin committed Mar 17, 2015
1 parent cfcb9a4 commit a06a557
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 136 deletions.
Expand Up @@ -54,6 +54,9 @@ public class JdbcEmptyCacheSelfTest extends GridCommonAbstractTest {
cache.setCacheMode(PARTITIONED);
cache.setBackups(1);
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setIndexedTypes(
Byte.class, Byte.class
);

cfg.setCacheConfiguration(cache);

Expand Down
Expand Up @@ -489,8 +489,8 @@ private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) {
* @return Cursor.
*/
private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) {
return new QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields(
ctx.name(), q.getSql(), q.getArgs()));
return ctx.kernalContext().query().queryLocalFields(
ctx.name(), q.getSql(), q.getArgs());
}

/**
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.query.*;

import java.util.*;

Expand All @@ -32,6 +33,9 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
/** */
private boolean iterTaken;

/** */
private Collection<GridQueryFieldMetadata> fieldsMeta;

/**
* @param iter Iterator.
*/
Expand Down Expand Up @@ -95,4 +99,18 @@ public QueryCursorImpl(Iterator<T> iter) {
}
}
}

/**
* @param fieldsMeta SQL Fields query result metadata.
*/
public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) {
this.fieldsMeta = fieldsMeta;
}

/**
* @return SQL Fields query result metadata.
*/
public Collection<GridQueryFieldMetadata> fieldsMeta() {
return fieldsMeta;
}
}
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.cache.query.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.query.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
Expand Down Expand Up @@ -194,7 +195,9 @@ private static class JdbcDriverJob extends ComputeJobAdapter {

QueryCursor<List<?>> cursor = cache.queryFields(qry);

Collection<GridQueryFieldMetadata> meta = null; // TODO
Collection<GridQueryFieldMetadata> meta = ((QueryCursorImpl<List<?>>)cursor).fieldsMeta();

assert meta != null;

tbls = new ArrayList<>(meta.size());
cols = new ArrayList<>(meta.size());
Expand Down
Expand Up @@ -35,7 +35,6 @@
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.indexing.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
Expand Down Expand Up @@ -555,13 +554,12 @@ private static interface ClIter<X> extends AutoCloseable, Iterator<X> {
* @param args Arguments.
* @return Iterator.
*/
public Iterator<List<?>> queryLocalFields(String space, String sql, Object[] args) {
public QueryCursor<List<?>> queryLocalFields(String space, String sql, Object[] args) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is stopping).");

try {
IgniteSpiCloseableIterator<List<?>> iterator =
idx.queryFields(space, sql, F.asList(args), idx.backupFilter()).iterator();
GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());

if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
ctx.event().record(new CacheQueryExecutedEvent<>(
Expand All @@ -579,10 +577,14 @@ public Iterator<List<?>> queryLocalFields(String space, String sql, Object[] arg
null));
}

return iterator;
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(res.iterator());

cursor.fieldsMeta(res.metaData());

return cursor;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
throw new CacheException(e);
}
finally {
busyLock.leaveBusy();
Expand Down
Expand Up @@ -65,7 +65,6 @@
import java.text.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.internal.processors.query.GridQueryIndexType.*;
Expand Down Expand Up @@ -496,18 +495,7 @@ private void removeTable(TableDescriptor tbl) throws IgniteCheckedException {

if (rs != null) {
try {
ResultSetMetaData rsMeta = rs.getMetaData();

meta = new ArrayList<>(rsMeta.getColumnCount());

for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
String schemaName = rsMeta.getSchemaName(i);
String typeName = rsMeta.getTableName(i);
String name = rsMeta.getColumnLabel(i);
String type = rsMeta.getColumnClassName(i);

meta.add(new SqlFieldMetadata(schemaName, typeName, name, type));
}
meta = meta(rs.getMetaData());
}
catch (SQLException e) {
throw new IgniteSpiException("Failed to get meta data.", e);
Expand All @@ -521,6 +509,26 @@ private void removeTable(TableDescriptor tbl) throws IgniteCheckedException {
}
}

/**
* @param rsMeta Metadata.
* @return List of fields metadata.
* @throws SQLException If failed.
*/
private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) throws SQLException {
ArrayList<GridQueryFieldMetadata> meta = new ArrayList<>(rsMeta.getColumnCount());

for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
String schemaName = rsMeta.getSchemaName(i);
String typeName = rsMeta.getTableName(i);
String name = rsMeta.getColumnLabel(i);
String type = rsMeta.getColumnClassName(i);

meta.add(new SqlFieldMetadata(schemaName, typeName, name, type));
}

return meta;
}

/**
* @param stmt Prepared statement.
* @return Command type.
Expand Down Expand Up @@ -739,12 +747,38 @@ public void bindParameters(PreparedStatement stmt, @Nullable Collection<Object>
@Override public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, Object[] params) {
Connection c = connectionForSpace(space);

GridCacheTwoStepQuery twoStepQry = GridSqlQuerySplitter.split(c, sqlQry, params);
PreparedStatement stmt;

try {
stmt = c.prepareStatement(sqlQry);
}
catch (SQLException e) {
throw new CacheException("Failed to parse query: " + sqlQry, e);
}

GridCacheTwoStepQuery twoStepQry;
Collection<GridQueryFieldMetadata> meta;

try {
twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, params);

meta = meta(stmt.getMetaData());
}
catch (SQLException e) {
throw new CacheException(e);
}
finally {
U.close(stmt, log);
}

if (log.isDebugEnabled())
log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry);

return queryTwoStep(space, twoStepQry);
QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)queryTwoStep(space, twoStepQry);

cursor.fieldsMeta(meta);

return cursor;
}

/**
Expand Down Expand Up @@ -1049,7 +1083,7 @@ public GridReduceQueryExecutor reduceQueryExecutor() {
if (Utils.serializer != null)
U.warn(log, "Custom H2 serialization is already configured, will override.");

Utils.serializer = h2Serializer(ctx != null && ctx.deploy().enabled());
Utils.serializer = h2Serializer();

String dbName = (ctx != null ? ctx.localNodeId() : UUID.randomUUID()).toString();

Expand Down Expand Up @@ -1097,83 +1131,10 @@ public GridReduceQueryExecutor reduceQueryExecutor() {
}

/**
* @param p2pEnabled If peer-deployment is enabled.
* @return Serializer.
*/
protected JavaObjectSerializer h2Serializer(boolean p2pEnabled) {
return p2pEnabled ?
new JavaObjectSerializer() {
/** */
private volatile Map<ClassLoader, Byte> ldr2id = Collections.emptyMap();

/** */
private volatile Map<Byte, ClassLoader> id2ldr = Collections.emptyMap();

/** */
private byte ldrIdGen = Byte.MIN_VALUE;

/** */
private final Lock lock = new ReentrantLock();

@Override public byte[] serialize(Object obj) throws Exception {
ClassLoader ldr = obj.getClass().getClassLoader();

Byte ldrId = ldr2id.get(ldr);

if (ldrId == null) {
lock.lock();

try {
ldrId = ldr2id.get(ldr);

if (ldrId == null) {
ldrId = ldrIdGen++;

if (id2ldr.containsKey(ldrId)) // Overflow.
throw new IgniteException("Failed to add new peer-to-peer class loader.");

Map<Byte, ClassLoader> id2ldr0 = new HashMap<>(id2ldr);
Map<ClassLoader, Byte> ldr2id0 = new IdentityHashMap<>(ldr2id);

id2ldr0.put(ldrId, ldr);
ldr2id0.put(ldr, ldrId);

ldr2id = ldr2id0;
id2ldr = id2ldr0;
}
}
finally {
lock.unlock();
}
}

byte[] bytes = marshaller.marshal(obj);

int len = bytes.length;

bytes = Arrays.copyOf(bytes, len + 1); // The last byte is for ldrId.

bytes[len] = ldrId;

return bytes;
}

@Override public Object deserialize(byte[] bytes) throws Exception {
int last = bytes.length - 1;

byte ldrId = bytes[last];

ClassLoader ldr = id2ldr.get(ldrId);

if (ldr == null)
throw new IllegalStateException("Class loader was not found: " + ldrId);

bytes = Arrays.copyOf(bytes, last); // Trim the last byte.

return marshaller.unmarshal(bytes, ldr);
}
} :
new JavaObjectSerializer() {
protected JavaObjectSerializer h2Serializer() {
return new JavaObjectSerializer() {
@Override public byte[] serialize(Object obj) throws Exception {
return marshaller.marshal(obj);
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2.sql;

import org.apache.ignite.*;
import org.h2.command.*;
import org.h2.command.dml.*;
import org.h2.engine.*;
import org.h2.expression.*;
Expand All @@ -28,7 +29,6 @@
import org.jetbrains.annotations.*;

import java.lang.reflect.*;
import java.sql.*;
import java.util.*;
import java.util.Set;

Expand Down Expand Up @@ -150,18 +150,35 @@ public class GridSqlQueryParser {
/** */
private static final Getter<JavaFunction, FunctionAlias> FUNC_ALIAS = getter(JavaFunction.class, "functionAlias");

/** */
private static final Getter<JdbcPreparedStatement,Command> COMMAND = getter(JdbcPreparedStatement.class, "command");

/** */
private static volatile Getter<Command,Prepared> prepared;

/** */
private final IdentityHashMap<Object, Object> h2ObjToGridObj = new IdentityHashMap<>();

/**
* @param conn Connection.
* @param select Select query.
* @return Parsed select query.
* @param stmt Prepared statement.
* @return Parsed select.
*/
public static GridSqlSelect parse(Connection conn, String select) {
Session ses = (Session)((JdbcConnection)conn).getSession();
public static GridSqlSelect parse(JdbcPreparedStatement stmt) {
Command cmd = COMMAND.get(stmt);

Getter<Command,Prepared> p = prepared;

if (p == null) {
Class<? extends Command> cls = cmd.getClass();

assert cls.getSimpleName().equals("CommandContainer");

prepared = p = getter(cls, "prepared");
}

Prepared select = p.get(cmd);

return new GridSqlQueryParser().parse((Select)ses.prepare(select));
return new GridSqlQueryParser().parse((Select)select);
}

/**
Expand Down Expand Up @@ -510,7 +527,7 @@ private static void assert0(boolean cond, Object o) {
* @param cls Class.
* @param fldName Fld name.
*/
private static <T, R> Getter<T, R> getter(Class<T> cls, String fldName) {
private static <T, R> Getter<T, R> getter(Class<? extends T> cls, String fldName) {
Field field;

try {
Expand Down

0 comments on commit a06a557

Please sign in to comment.