Skip to content

Commit

Permalink
fix(datastore): improve performances on messages store
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese authored and Coduz committed Dec 6, 2023
1 parent 42c124a commit ab9319f
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Feature: Datastore tests

Given Server with host "127.0.0.1" on port "9200"
And I login as user with name "kapua-sys" and password "kapua-password"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP", messageUniquenessCheck "FULL"
And System property "datastore.index.window" with value "day"
When I delete all indices
And I select account "kapua-sys"
Expand All @@ -80,7 +80,7 @@ Feature: Datastore tests

Given Server with host "127.0.0.1" on port "9200"
And I login as user with name "kapua-sys" and password "kapua-password"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP", messageUniquenessCheck "FULL"
And System property "datastore.index.window" with value "hour"
When I delete all indices
And I select account "kapua-sys"
Expand All @@ -98,7 +98,7 @@ Feature: Datastore tests

Given Server with host "127.0.0.1" on port "9200"
And I login as user with name "kapua-sys" and password "kapua-password"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP", messageUniquenessCheck "FULL"
And System property "datastore.index.window" with value "week"
When I delete all indices
And I select account "kapua-sys"
Expand All @@ -119,7 +119,7 @@ Feature: Datastore tests

Given Server with host "127.0.0.1" on port "9200"
And I login as user with name "kapua-sys" and password "kapua-password"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP", messageUniquenessCheck "FULL"
And System property "datastore.index.window" with value "day"
When I delete all indices
And I select account "kapua-sys"
Expand All @@ -140,7 +140,7 @@ Feature: Datastore tests

Given Server with host "127.0.0.1" on port "9200"
And I login as user with name "kapua-sys" and password "kapua-password"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP", messageUniquenessCheck "FULL"
And System property "datastore.index.window" with value "hour"
When I delete all indices
And I select account "kapua-sys"
Expand All @@ -160,7 +160,7 @@ Feature: Datastore tests

Given Server with host "127.0.0.1" on port "9200"
And I login as user with name "kapua-sys" and password "kapua-password"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP"
Given Dataservice config enabled "true", dataTTL 30, rxByteLimit 0, dataIndexBy "DEVICE_TIMESTAMP", messageUniquenessCheck "FULL"
And System property "datastore.index.window" with value "week"
And I configure account service
| type | name | value |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.kapua.service.datastore.internal.model.DataIndexBy;
import org.eclipse.kapua.service.datastore.internal.model.DatastoreMessageImpl;
import org.eclipse.kapua.service.datastore.internal.model.MessageListResultImpl;
import org.eclipse.kapua.service.datastore.internal.model.MessageUniquenessCheck;
import org.eclipse.kapua.service.datastore.internal.model.query.ChannelInfoQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.ClientInfoQueryImpl;
import org.eclipse.kapua.service.datastore.internal.model.query.MessageQueryImpl;
Expand Down Expand Up @@ -154,8 +155,10 @@ public StorableId store(KapuaMessage<?, ?> message, String messageId, boolean ne
String indexName = schemaMetadata.getDataIndexName();
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);

if (!newInsert) {
DatastoreMessage datastoreMessage = find(message.getScopeId(), STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT);
if (!newInsert && !MessageUniquenessCheck.NONE.equals(accountServicePlan.getMessageUniquenessCheck())) {
DatastoreMessage datastoreMessage = MessageUniquenessCheck.FULL.equals(accountServicePlan.getMessageUniquenessCheck()) ?
find(message.getScopeId(), STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT) :
find(message.getScopeId(), indexName, STORABLE_ID_FACTORY.newStorableId(messageId), StorableFetchStyle.SOURCE_SELECT);
if (datastoreMessage != null) {
LOG.debug("Message with datatstore id '{}' already found", messageId);
metricMessagesAlreadyInTheDatastoreCount.inc();
Expand Down Expand Up @@ -249,7 +252,23 @@ public void delete(KapuaId scopeId, StorableId id)
* @throws QueryMappingException
* @throws ClientException
*/
public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle)
public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle fetchStyle) throws KapuaIllegalArgumentException, ClientException {
ArgumentValidator.notNull(scopeId, SCOPE_ID);
return find(scopeId, SchemaUtil.getDataIndexName(scopeId), id, fetchStyle);
}

/**
* Find message by identifier
*
* @param scopeId
* @param id
* @param fetchStyle
* @return
* @throws KapuaIllegalArgumentException
* @throws QueryMappingException
* @throws ClientException
*/
public DatastoreMessage find(KapuaId scopeId, String indexName, StorableId id, StorableFetchStyle fetchStyle)
throws KapuaIllegalArgumentException, ClientException {

ArgumentValidator.notNull(scopeId, SCOPE_ID);
Expand All @@ -263,7 +282,7 @@ public DatastoreMessage find(KapuaId scopeId, StorableId id, StorableFetchStyle
idsPredicate.addId(id);
idsQuery.setPredicate(idsPredicate);

String indexName = SchemaUtil.getDataIndexName(scopeId);
// String indexName = SchemaUtil.getDataIndexName(scopeId);
TypeDescriptor typeDescriptor = new TypeDescriptor(indexName, MessageSchema.MESSAGE_TYPE_NAME);
return getElasticsearchClient().find(typeDescriptor, idsQuery, DatastoreMessage.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import org.eclipse.kapua.commons.util.KapuaDateUtils;
import org.eclipse.kapua.service.datastore.internal.model.DataIndexBy;
import org.eclipse.kapua.service.datastore.internal.model.MessageUniquenessCheck;
import org.eclipse.kapua.service.datastore.internal.model.metric.MetricsIndexBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,12 @@ public class MessageStoreConfiguration {
*/
public static final String CONFIGURATION_METRICS_INDEX_BY_KEY = "metricsIndexBy";

/**
* Message Uniqueness Check key (available options are in MessageUniquenessCheck enumeration).<br>
* <b>The key must be aligned with the key used in org.eclipse.kapua.service.datastore.MessageStoreService.xml meta data configuration file).</b>
*/
public static final String CONFIGURATION_MESSAGE_UNIQUENESS_CHECK = "messageUniquenessCheck";

/**
* Defines a value in service plan as unlimited resource
*/
Expand All @@ -87,6 +94,7 @@ public class MessageStoreConfiguration {
private long rxByteLimit = 1000000;
private DataIndexBy dataIndexBy = DataIndexBy.SERVER_TIMESTAMP;
private MetricsIndexBy metricsIndexBy = MetricsIndexBy.TIMESTAMP;
private MessageUniquenessCheck messageUniquenessCheck;

private Map<String, Object> values;

Expand Down Expand Up @@ -120,6 +128,9 @@ public MessageStoreConfiguration(Map<String, Object> values) {
if (this.values.get(CONFIGURATION_METRICS_INDEX_BY_KEY) != null) {
setMetricsIndexBy(MetricsIndexBy.valueOf((String) this.values.get(CONFIGURATION_METRICS_INDEX_BY_KEY)));
}
if (this.values.get(CONFIGURATION_MESSAGE_UNIQUENESS_CHECK) != null) {
setMessageUniquenessCheck(MessageUniquenessCheck.valueOf((String) this.values.get(CONFIGURATION_MESSAGE_UNIQUENESS_CHECK)));
}
}
}

Expand Down Expand Up @@ -231,4 +242,20 @@ public MetricsIndexBy getMetricsIndexBy() {
public void setMetricsIndexBy(MetricsIndexBy metricsIndexBy) {
this.metricsIndexBy = metricsIndexBy;
}

/**
* Get the message uniqueness check parameter ({@link MessageStoreConfiguration#CONFIGURATION_MESSAGE_UNIQUENESS_CHECK}
* @return
*/
public MessageUniquenessCheck getMessageUniquenessCheck() {
return messageUniquenessCheck;
}

/**
* Set the message uniqueness check parameter ({@link MessageStoreConfiguration#CONFIGURATION_MESSAGE_UNIQUENESS_CHECK}
*/
public void setMessageUniquenessCheck(MessageUniquenessCheck messageUniquenessCheck) {
this.messageUniquenessCheck = messageUniquenessCheck;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*******************************************************************************
* Copyright (c) 2022 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.service.datastore.internal.model;

/**
* Once a message is going to be stored to datastore the store call can terminate with error on client side (due to timeout for example) but performed on server side.
* The store call is then retried and the message could be inserted twice.
* To avoid that, the current implementation does a query looking for a message with a specific id (the one from the message) in all the indexes belonging to the account.
* This is safer since changes in the message indexing by or the settings of the datastore (index by week/day/hour) can affect the index where the message should be stored to and then the effectivness of the check.
* But this has a performance drawback. The number of queries to be performed are linear with the indexes available so, if there are a lot of indexes, the query will need more time and resources to be executed.
* This enum define a new parameter to change the search behavior by account.
*/
public enum MessageUniquenessCheck {

/**
* No check
*/
NONE,
/**
* The search is done only to the index where the message is expected to be, based on current configuration.
*/
BOUND,
/**
* Will check in all the indexes defined for the account
*/
FULL
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@
<Option label="SERVER_TIMESTAMP" value="SERVER_TIMESTAMP" />
</AD>

<AD id="messageUniquenessCheck"
name="messageUniquenessCheck"
type="String"
cardinality="0"
required="true"
default="FULL"
description="Message uniqueness check type (on telemetry message datastore insert)">
<Option label="NONE" value="NONE" />
<Option label="BOUND" value="BOUND" />
<Option label="FULL" value="FULL" />
</AD>

</OCD>

<Designate pid="org.eclipse.kapua.service.datastore.MessageStoreService">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1800,13 +1800,14 @@ public void checkListForNamedMetricOrdering(String lstKey) {
checkListOrder(metLst, getNamedMetricOrdering());
}

@Given("^Dataservice config enabled (.*), dataTTL (\\d+), rxByteLimit (\\d+), dataIndexBy \"(.*)\"$")
public void configureDatastoreService(String enabled, int dataTTL, int rxByteLimit, String dataIndexBy) throws Exception {
@Given("^Dataservice config enabled (.*), dataTTL (\\d+), rxByteLimit (\\d+), dataIndexBy \"(.*)\", messageUniquenessCheck \"(.*)\"$")
public void configureDatastoreService(String enabled, int dataTTL, int rxByteLimit, String dataIndexBy, String messageUniquenessCheck) throws Exception {
Map<String, Object> settings = new HashMap<>();
settings.put("enabled", enabled.equalsIgnoreCase("TRUE"));
settings.put("dataTTL", dataTTL);
settings.put("rxByteLimit", rxByteLimit);
settings.put("dataIndexBy", dataIndexBy);
settings.put("messageUniquenessCheck", messageUniquenessCheck);
messageStoreService.setConfigValues(KapuaId.ONE, null, settings);
}

Expand Down

0 comments on commit ab9319f

Please sign in to comment.