Skip to content

Commit

Permalink
ignite-4955 - Correctly execute SQL queries started on replicated cac…
Browse files Browse the repository at this point in the history
…he. - Fixes #1806.

Signed-off-by: Sergi Vladykin <sergi.vladykin@gmail.com>
  • Loading branch information
svladykin committed Apr 17, 2017
1 parent 86c4058 commit ded599a
Show file tree
Hide file tree
Showing 15 changed files with 563 additions and 304 deletions.
Expand Up @@ -67,6 +67,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean distributedJoins;

/** */
private boolean replicatedOnly;

/**
* Constructs SQL fields query.
*
Expand Down Expand Up @@ -236,6 +239,28 @@ public boolean isDistributedJoins() {
return (SqlFieldsQuery)super.setLocal(loc);
}

/**
* Specify if the query contains only replicated tables.
* This is a hint for potentially more effective execution.
*
* @param replicatedOnly The query contains only replicated tables.
* @return {@code this} For chaining.
*/
public SqlFieldsQuery setReplicatedOnly(boolean replicatedOnly) {
this.replicatedOnly = replicatedOnly;

return this;
}

/**
* Check is the query contains only replicated tables.
*
* @return {@code true} If the query contains only replicated tables.
*/
public boolean isReplicatedOnly() {
return replicatedOnly;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlFieldsQuery.class, this);
Expand Down
Expand Up @@ -53,6 +53,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
private boolean distributedJoins;

/** */
private boolean replicatedOnly;

/**
* Constructs query for the given type name and SQL query.
*
Expand Down Expand Up @@ -197,7 +200,7 @@ public SqlQuery<K, V> setTimeout(int timeout, TimeUnit timeUnit) {
* @param type Type.
* @return {@code this} For chaining.
*/
public SqlQuery setType(Class<?> type) {
public SqlQuery<K, V> setType(Class<?> type) {
return setType(QueryUtils.typeName(type));
}

Expand All @@ -210,7 +213,7 @@ public SqlQuery setType(Class<?> type) {
* @param distributedJoins Distributed joins enabled.
* @return {@code this} For chaining.
*/
public SqlQuery setDistributedJoins(boolean distributedJoins) {
public SqlQuery<K, V> setDistributedJoins(boolean distributedJoins) {
this.distributedJoins = distributedJoins;

return this;
Expand All @@ -219,12 +222,34 @@ public SqlQuery setDistributedJoins(boolean distributedJoins) {
/**
* Check if distributed joins are enabled for this query.
*
* @return {@code true} If distributed joind enabled.
* @return {@code true} If distributed joins enabled.
*/
public boolean isDistributedJoins() {
return distributedJoins;
}

/**
* Specify if the query contains only replicated tables.
* This is a hint for potentially more effective execution.
*
* @param replicatedOnly The query contains only replicated tables.
* @return {@code this} For chaining.
*/
public SqlQuery<K, V> setReplicatedOnly(boolean replicatedOnly) {
this.replicatedOnly = replicatedOnly;

return this;
}

/**
* Check is the query contains only replicated tables.
*
* @return {@code true} If the query contains only replicated tables.
*/
public boolean isReplicatedOnly() {
return replicatedOnly;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlQuery.class, this);
Expand Down
Expand Up @@ -780,7 +780,7 @@ private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, bool

final SqlQuery p = (SqlQuery)qry;

if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
opCtxCall != null && opCtxCall.isKeepBinary());

Expand All @@ -794,7 +794,7 @@ private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, bool

SqlFieldsQuery p = (SqlFieldsQuery)qry;

if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p);

return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
Expand Down
Expand Up @@ -21,25 +21,19 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;

/**
* Query.
*/
public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
public class GridCacheSqlQuery implements Message {
/** */
private static final long serialVersionUID = 0L;

Expand All @@ -50,24 +44,10 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
@GridToStringInclude(sensitive = true)
private String qry;

/** */
@GridToStringInclude(sensitive = true)
@GridDirectTransient
private Object[] params;

/** */
private byte[] paramsBytes;

/** */
@GridToStringInclude
@GridDirectTransient
private int[] paramIdxs;

/** */
@GridToStringInclude
@GridDirectTransient
private int paramsSize;

/** */
@GridToStringInclude
@GridDirectTransient
Expand Down Expand Up @@ -139,13 +119,6 @@ public GridCacheSqlQuery query(String qry) {
return this;
}

/**
* @return Parameters.
*/
public Object[] parameters() {
return params;
}

/**
* @return Parameter indexes.
*/
Expand All @@ -154,56 +127,15 @@ public int[] parameterIndexes() {
}

/**
* @param params Parameters.
* @param paramIdxs Parameter indexes.
* @return {@code this} For chaining.
* @return {@code this}.
*/
public GridCacheSqlQuery parameters(Object[] params, int[] paramIdxs) {
this.params = F.isEmpty(params) ? EMPTY_PARAMS : params;

paramsSize = this.params.length;

public GridCacheSqlQuery parameterIndexes(int[] paramIdxs) {
this.paramIdxs = paramIdxs;

return this;
}

/** {@inheritDoc} */
@Override public void marshall(Marshaller m) {
if (paramsBytes != null)
return;

assert params != null;

try {
paramsBytes = U.marshal(m, params);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

/** {@inheritDoc} */
@Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
if (params != null)
return;

assert paramsBytes != null;

try {
final ClassLoader ldr = U.resolveClassLoader(ctx.config());

if (m instanceof BinaryMarshaller)
// To avoid deserializing of enum types.
params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
else
params = U.unmarshal(m, paramsBytes, ldr);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
Expand Down Expand Up @@ -239,7 +171,7 @@ public GridCacheSqlQuery parameters(Object[] params, int[] paramIdxs) {
writer.incrementState();

case 2:
if (!writer.writeByteArray("paramsBytes", paramsBytes))
if (!writer.writeIntArray("paramIdxs", paramIdxs))
return false;

writer.incrementState();
Expand Down Expand Up @@ -280,7 +212,7 @@ public GridCacheSqlQuery parameters(Object[] params, int[] paramIdxs) {
reader.incrementState();

case 2:
paramsBytes = reader.readByteArray("paramsBytes");
paramIdxs = reader.readIntArray("paramIdxs");

if (!reader.isLastRead())
return false;
Expand Down Expand Up @@ -311,28 +243,17 @@ public GridCacheSqlQuery parameters(Object[] params, int[] paramIdxs) {
}

/**
* @param args Arguments.
* @return Copy.
*/
public GridCacheSqlQuery copy(Object[] args) {
public GridCacheSqlQuery copy() {
GridCacheSqlQuery cp = new GridCacheSqlQuery();

cp.qry = qry;
cp.cols = cols;
cp.paramIdxs = paramIdxs;
cp.paramsSize = paramsSize;
cp.sort = sort;
cp.partitioned = partitioned;

if (F.isEmpty(args))
cp.params = EMPTY_PARAMS;
else {
cp.params = new Object[paramsSize];

for (int paramIdx : paramIdxs)
cp.params[paramIdx] = args[paramIdx];
}

return cp;
}

Expand Down Expand Up @@ -380,4 +301,27 @@ public GridCacheSqlQuery node(UUID node) {

return this;
}

/**
* @param allParams All parameters.
* @return Parameters only for this query.
*/
public Object[] parameters(Object[] allParams) {
if (F.isEmpty(paramIdxs))
return EMPTY_PARAMS;

assert !F.isEmpty(allParams);

int maxIdx = paramIdxs[paramIdxs.length - 1];

Object[] res = new Object[maxIdx + 1];

for (int i = 0; i < paramIdxs.length; i++) {
int idx = paramIdxs[i];

res[idx] = allParams[idx];
}

return res;
}
}
Expand Up @@ -95,7 +95,7 @@ public void distributedJoins(boolean distributedJoins) {
/**
* Check if distributed joins are enabled for this query.
*
* @return {@code true} If distributed joind enabled.
* @return {@code true} If distributed joins enabled.
*/
public boolean distributedJoins() {
return distributedJoins;
Expand Down Expand Up @@ -146,12 +146,23 @@ public int pageSize() {

/**
* @param qry SQL Query.
* @return {@code this}.
*/
public GridCacheTwoStepQuery addMapQuery(GridCacheSqlQuery qry) {
public void addMapQuery(GridCacheSqlQuery qry) {
mapQrys.add(qry);
}

/**
* @return {@code true} If all the map queries contain only replicated tables.
*/
public boolean isReplicatedOnly() {
assert !mapQrys.isEmpty();

for (int i = 0; i < mapQrys.size(); i++) {
if (mapQrys.get(i).isPartitioned())
return false;
}

return this;
return true;
}

/**
Expand Down Expand Up @@ -246,24 +257,23 @@ public void local(boolean local) {
}

/**
* @param args New arguments to copy with.
* @return Copy.
*/
public GridCacheTwoStepQuery copy(Object[] args) {
public GridCacheTwoStepQuery copy() {
assert !explain;

GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);

cp.caches = caches;
cp.extraCaches = extraCaches;
cp.spaces = spaces;
cp.rdc = rdc.copy(args);
cp.rdc = rdc.copy();
cp.skipMergeTbl = skipMergeTbl;
cp.pageSize = pageSize;
cp.distributedJoins = distributedJoins;

for (int i = 0; i < mapQrys.size(); i++)
cp.mapQrys.add(mapQrys.get(i).copy(args));
cp.mapQrys.add(mapQrys.get(i).copy());

return cp;
}
Expand Down

0 comments on commit ded599a

Please sign in to comment.