Skip to content
Permalink
Browse files
IGNITE-17511 Java ThinClient supports IndexQuery (#10188)
  • Loading branch information
timoninmaxim committed Aug 12, 2022
1 parent 3c8296a commit a9d49f2ad761cb014c0bb27f9a806004dbf014a2
Show file tree
Hide file tree
Showing 8 changed files with 607 additions and 3 deletions.
@@ -166,6 +166,11 @@ public enum ClientOperationType {
*/
QUERY_CONTINUOUS,

/**
* Index query ({@link ClientCache#query(Query)}).
*/
QUERY_INDEX,

/**
* Start transaction ({@link ClientTransactions#txStart}).
*/
@@ -150,6 +150,12 @@ public enum ClientOperation {
/** Continuous query event. */
QUERY_CONTINUOUS_EVENT(2007, ClientNotificationType.CONTINUOUS_QUERY_EVENT),

/** Index query event. */
QUERY_INDEX(2008),

/** Query index cursor get page. */
QUERY_INDEX_CURSOR_GET_PAGE(2009),

/** Get binary type name. */
GET_BINARY_TYPE_NAME(3000),

@@ -395,6 +401,9 @@ public ClientNotificationType notificationType() {
case QUERY_CONTINUOUS:
return ClientOperationType.QUERY_CONTINUOUS;

case QUERY_INDEX:
return ClientOperationType.QUERY_INDEX;

case TX_START:
return ClientOperationType.TRANSACTION_START;

@@ -33,6 +33,8 @@
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.IndexQueryCriterion;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
@@ -44,16 +46,20 @@
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
import static org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;

@@ -747,6 +753,8 @@ else if (qry instanceof SqlFieldsQuery)
res = (QueryCursor<R>)query((SqlFieldsQuery)qry);
else if (qry instanceof ContinuousQuery)
res = query((ContinuousQuery<K, V>)qry, null);
else if (qry instanceof IndexQuery)
res = indexQuery((IndexQuery)qry);
else
throw new IllegalArgumentException(
String.format("Query of type [%s] is not supported", qry.getClass().getSimpleName())
@@ -947,6 +955,71 @@ private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
));
}

/** Handle index query. */
private QueryCursor<Cache.Entry<K, V>> indexQuery(IndexQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);

BinaryOutputStream out = payloadCh.out();

try (BinaryRawWriterEx w = new BinaryWriterExImpl(marsh.context(), out, null, null)) {
w.writeInt(qry.getPageSize());
w.writeBoolean(qry.isLocal());
w.writeInt(qry.getPartition() == null ? -1 : qry.getPartition());

w.writeString(qry.getValueType());
w.writeString(qry.getIndexName());

if (qry.getCriteria() != null) {
out.writeByte(ARR_LIST);
out.writeInt(qry.getCriteria().size());

for (IndexQueryCriterion c: qry.getCriteria()) {
if (c instanceof RangeIndexQueryCriterion) {
out.writeByte((byte)0); // Criterion type.

RangeIndexQueryCriterion range = (RangeIndexQueryCriterion)c;

w.writeString(range.field());
w.writeBoolean(range.lowerIncl());
w.writeBoolean(range.upperIncl());
w.writeBoolean(range.lowerNull());
w.writeBoolean(range.upperNull());

serDes.writeObject(out, range.lower());
serDes.writeObject(out, range.upper());
}
else {
throw new IllegalArgumentException(
String.format("Unknown IndexQuery criterion type [%s]", c.getClass().getSimpleName())
);
}
}
}
else
out.writeByte(GridBinaryMarshaller.NULL);
}

if (qry.getFilter() == null)
out.writeByte(GridBinaryMarshaller.NULL);
else {
serDes.writeObject(out, qry.getFilter());
out.writeByte(JAVA_PLATFORM);
}
};

return new ClientQueryCursor<>(new ClientQueryPager<>(
ch,
ClientOperation.QUERY_INDEX,
ClientOperation.QUERY_INDEX_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh,
cacheId,
qry.getPartition() == null ? -1 : qry.getPartition()
));
}

/** Handle SQL query. */
private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
@@ -53,6 +53,7 @@
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetOrCreateWithNameRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetSizeRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheIndexQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheLocalPeekRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheNodePartitionsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePartitionsRequest;
@@ -252,6 +253,12 @@ public class ClientMessageParser implements ClientListenerMessageParser {
/** */
public static final short OP_QUERY_CONTINUOUS_EVENT_NOTIFICATION = 2007;

/** */
private static final short OP_QUERY_INDEX = 2008;

/** */
private static final short OP_QUERY_INDEX_CURSOR_GET_PAGE = 2009;

/* Binary metadata operations. */
/** */
private static final short OP_BINARY_TYPE_NAME_GET = 3000;
@@ -459,6 +466,8 @@ public ClientListenerRequest decode(BinaryReaderExImpl reader) {
case OP_QUERY_SCAN_CURSOR_GET_PAGE:

case OP_QUERY_SQL_CURSOR_GET_PAGE:

case OP_QUERY_INDEX_CURSOR_GET_PAGE:
return new ClientCacheQueryNextPageRequest(reader);

case OP_RESOURCE_CLOSE:
@@ -575,6 +584,9 @@ public ClientListenerRequest decode(BinaryReaderExImpl reader) {
case OP_QUERY_CONTINUOUS:
return new ClientCacheQueryContinuousRequest(reader);

case OP_QUERY_INDEX:
return new ClientCacheIndexQueryRequest(reader);

case OP_TX_START:
return new ClientTxStartRequest(reader);

@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.platform.client.cache;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.IndexQueryCriterion;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;

import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;

/**
* IndexQuery request.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ClientCacheIndexQueryRequest extends ClientCacheRequest {
/** IndexQuery. */
private final IndexQuery qry;

/** Page size. */
private final int pageSize;

/**
* @param reader Reader.
*/
public ClientCacheIndexQueryRequest(BinaryRawReaderEx reader) {
super(reader);

pageSize = reader.readInt();

boolean loc = reader.readBoolean();

int part = reader.readInt();

String valType = reader.readString();

String idxName = reader.readString();

byte arrMark = reader.readByte();

List<IndexQueryCriterion> criteria = null;

if (arrMark == ARR_LIST) {
int critSize = reader.readInt();

criteria = new ArrayList<>(critSize);

for (int i = 0; i < critSize; i++)
criteria.add(readCriterion(reader));
}

Object filterObj = reader.readObjectDetached();

qry = new IndexQuery(valType, idxName);

qry.setPageSize(pageSize);
qry.setLocal(loc);

if (part >= 0)
qry.setPartition(part);

if (criteria != null)
qry.setCriteria(Arrays.asList(criteria.toArray()));

if (filterObj != null)
qry.setFilter(((BinaryObject)filterObj).deserialize());
}

/** */
private IndexQueryCriterion readCriterion(BinaryRawReaderEx reader) {
byte type = reader.readByte();

if (type == 0) {
String field = reader.readString();

boolean lowerIncl = reader.readBoolean();
boolean upperIncl = reader.readBoolean();
boolean lowerNull = reader.readBoolean();
boolean upperNull = reader.readBoolean();

Object lower = reader.readObjectDetached();
Object upper = reader.readObjectDetached();

RangeIndexQueryCriterion r = new RangeIndexQueryCriterion(field, lower, upper);
r.lowerIncl(lowerIncl);
r.upperIncl(upperIncl);
r.lowerNull(lowerNull);
r.upperNull(upperNull);

return r;
}

throw new IgniteException("Unknown IndexQuery criterion type: " + type);
}

/**
* {@inheritDoc}
*/
@Override public ClientResponse process(ClientConnectionContext ctx) {
IgniteCache cache = !isKeepBinary() ? rawCache(ctx) : cache(ctx);

ctx.incrementCursors();

try {
QueryCursor cur = cache.query(qry);

ClientCacheEntryQueryCursor cliCur = new ClientCacheEntryQueryCursor(cur, pageSize, ctx);

long cursorId = ctx.resources().put(cliCur);

cliCur.id(cursorId);

return new ClientCacheQueryResponse(requestId(), cliCur);
}
catch (Exception e) {
ctx.decrementCursors();

throw e;
}
}
}
@@ -316,13 +316,13 @@ public void testRetryPolicyConvertOpAllOperationsSupported() {

String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));

long expectedNullCount = 18;
long expectedNullCnt = 19;

String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
+ nullOpsNames;

assertEquals(msg, expectedNullCount, nullOps.size());
assertEquals(msg, expectedNullCnt, nullOps.size());
}

/**
@@ -41,7 +41,8 @@
IndexQueryWrongIndexTest.class,
MultifieldIndexQueryTest.class,
MultiTableIndexQuery.class,
RepeatedFieldIndexQueryTest.class
RepeatedFieldIndexQueryTest.class,
ThinClientIndexQueryTest.class
})
public class IndexQueryTestSuite {
}

0 comments on commit a9d49f2

Please sign in to comment.