Skip to content

Commit

Permalink
changes to limitExceeded calculation on dataStore
Browse files Browse the repository at this point in the history
  • Loading branch information
angelo.andreussi authored and Coduz committed Dec 11, 2023
1 parent 0d6777f commit 3606645
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class ResultList<T> {

private final List<T> result;
private final long totalCount;
private boolean totalHitsExceedsCount; //true iff in ES there are actually more than 10k hits

/**
* Constructor.
Expand All @@ -48,6 +49,14 @@ public void add(T object) {
result.add(object);
}

public void setTotalHitsExceedsCount(boolean value) {
this.totalHitsExceedsCount=value;
}

public boolean getTotalHitsExceedsCount() {
return this.totalHitsExceedsCount;
}

/**
* Gets the {@link List} of results.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ private ElasticsearchKeywords() {
static final String KEY_HITS = "hits";
static final String KEY_TOTAL = "total";
static final String KEY_VALUE = "value";
static final String KEY_RELATION = "relation";
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,18 @@ public <T> ResultList<T> query(TypeDescriptor typeDescriptor, Object query, Clas

long totalCount = 0;
ArrayNode resultsNode = null;
String totalRelation = null;

Request request = new Request(ElasticsearchKeywords.ACTION_GET, ElasticsearchResourcePaths.search(typeDescriptor));
request.setJsonEntity(json);
Response queryResponse = restCallTimeoutHandler(() -> getClient().performRequest(request), typeDescriptor.getIndex(), "QUERY");

if (isRequestSuccessful(queryResponse)) {
JsonNode responseNode = readResponseAsJsonNode(queryResponse);

JsonNode hitsNode = responseNode.path(ElasticsearchKeywords.KEY_HITS);

totalCount = hitsNode.path(ElasticsearchKeywords.KEY_TOTAL).path(ElasticsearchKeywords.KEY_VALUE).asLong();
totalRelation = hitsNode.path(ElasticsearchKeywords.KEY_TOTAL).path(ElasticsearchKeywords.KEY_RELATION).asText();
if (totalCount > Integer.MAX_VALUE) {
throw new ClientException(ClientErrorCodes.ACTION_ERROR, CLIENT_HITS_MAX_VALUE_EXCEEDED);
}
Expand All @@ -274,6 +277,9 @@ public <T> ResultList<T> query(TypeDescriptor typeDescriptor, Object query, Clas
}

ResultList<T> resultList = new ResultList<>(totalCount);
if (totalRelation != null) {
resultList.setTotalHitsExceedsCount(!totalRelation.equals("eq"));
}
Object queryFetchStyle = getModelConverter().getFetchStyle(query);
if (resultsNode != null && !resultsNode.isEmpty()) {
for (JsonNode result : resultsNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ protected ElasticsearchClient<?> getElasticsearchClient() throws ClientUnavailab
return DatastoreClientFactory.getElasticsearchClient();
}

protected <T extends Storable> void setLimitExceed(StorableQuery query, StorableListResult<T> list) {
protected <T extends Storable> void setLimitExceed(StorableQuery query, boolean hitsExceedsTotalCount, StorableListResult<T> list) {
int offset = query.getOffset() != null ? query.getOffset() : 0;
if (query.getLimit() != null && list.getTotalCount() > offset + query.getLimit()) {
list.setLimitExceeded(true);
if (query.getLimit() != null) {
if (hitsExceedsTotalCount || //pre-condition: there are more than 10k documents in ES && query limit is <= 10k
list.getTotalCount() > offset + query.getLimit()) {
list.setLimitExceeded(true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.kapua.service.datastore.model.ChannelInfoListResult;
import org.eclipse.kapua.service.datastore.model.query.ChannelInfoQuery;
import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
Expand Down Expand Up @@ -193,8 +194,9 @@ public ChannelInfoListResult query(ChannelInfoQuery query) throws KapuaIllegalAr

String indexName = SchemaUtil.getChannelIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ChannelInfoSchema.CHANNEL_TYPE_NAME);
ChannelInfoListResult result = new ChannelInfoListResultImpl(getElasticsearchClient().query(typeDescriptor, query, ChannelInfo.class));
setLimitExceed(query, result);
ResultList<ChannelInfo> rl = getElasticsearchClient().query(typeDescriptor, query, ChannelInfo.class);
ChannelInfoListResult result = new ChannelInfoListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.kapua.service.datastore.model.ClientInfoListResult;
import org.eclipse.kapua.service.datastore.model.query.ClientInfoQuery;
import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
Expand Down Expand Up @@ -188,8 +189,9 @@ public ClientInfoListResult query(ClientInfoQuery query) throws KapuaIllegalArgu

String indexName = SchemaUtil.getClientIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, ClientInfoSchema.CLIENT_TYPE_NAME);
ClientInfoListResultImpl result = new ClientInfoListResultImpl(getElasticsearchClient().query(typeDescriptor, query, ClientInfo.class));
setLimitExceed(query, result);
ResultList<ClientInfo> rl = getElasticsearchClient().query(typeDescriptor, query, ClientInfo.class);
ClientInfoListResult result = new ClientInfoListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ public MessageListResult query(MessageQuery query)

String dataIndexName = SchemaUtil.getDataIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(dataIndexName, MessageSchema.MESSAGE_TYPE_NAME);
MessageListResult result = new MessageListResultImpl(getElasticsearchClient().query(typeDescriptor, query, DatastoreMessage.class));
setLimitExceed(query, result);
ResultList<DatastoreMessage> rl = getElasticsearchClient().query(typeDescriptor, query, DatastoreMessage.class);
MessageListResult result = new MessageListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.kapua.service.elasticsearch.client.exception.ClientException;
import org.eclipse.kapua.service.elasticsearch.client.model.BulkUpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.BulkUpdateResponse;
import org.eclipse.kapua.service.elasticsearch.client.model.ResultList;
import org.eclipse.kapua.service.elasticsearch.client.model.TypeDescriptor;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateRequest;
import org.eclipse.kapua.service.elasticsearch.client.model.UpdateResponse;
Expand Down Expand Up @@ -257,8 +258,9 @@ public MetricInfoListResult query(MetricInfoQuery query) throws KapuaIllegalArgu

String indexNme = SchemaUtil.getMetricIndexName(query.getScopeId());
TypeDescriptor typeDescriptor = new TypeDescriptor(indexNme, MetricInfoSchema.METRIC_TYPE_NAME);
MetricInfoListResult result = new MetricInfoListResultImpl(getElasticsearchClient().query(typeDescriptor, query, MetricInfo.class));
setLimitExceed(query, result);
ResultList<MetricInfo> rl = getElasticsearchClient().query(typeDescriptor, query, MetricInfo.class);
MetricInfoListResult result = new MetricInfoListResultImpl(rl);
setLimitExceed(query, rl.getTotalHitsExceedsCount(), result);
return result;
}

Expand Down

0 comments on commit 3606645

Please sign in to comment.