diff --git a/persistence-elasticsearch/core/pom.xml b/persistence-elasticsearch/core/pom.xml
index 8aa4fd7baf..282e37dee8 100644
--- a/persistence-elasticsearch/core/pom.xml
+++ b/persistence-elasticsearch/core/pom.xml
@@ -238,6 +238,7 @@
true
+ <_dsannotations>*
org.jctools.queues;resolution:=optional,
org.apache.logging.log4j.util.internal;resolution:=optional,
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceConf.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceConf.java
new file mode 100644
index 0000000000..04783bfa3a
--- /dev/null
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceConf.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.persistence.elasticsearch;
+
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(name = "ElasticSearch persistence service config", description = "The configuration for the ElasticSearch persistence service")
+public @interface ElasticSearchPersistenceServiceConf {
+
+ String minimalElasticSearchVersion() default "7.0.0";
+ String maximalElasticSearchVersion() default "8.0.0";
+ String cluster_name() default "contextElasticSearch";
+ String elasticSearchAddresses() default "localhost:9200";
+ String index_prefix() default "context";
+ String username();
+ String password();
+ boolean sslEnable() default false;
+ boolean sslTrustAllCertificates() default false;
+ boolean throwExceptions() default false;
+ boolean alwaysOverwrite() default true;
+ String logLevelRestClient() default "ERROR";
+ String fatalIllegalStateErrors();
+
+ String numberOfShards() default "5";
+ String numberOfReplicas() default "0";
+ String indexMappingTotalFieldsLimit() default "1000";
+ String indexMaxDocValueFieldsSearch() default "1000";
+
+ String monthlyIndex_numberOfShards() default "3"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_numberOfReplicas() default "0"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_indexMappingTotalFieldsLimit() default "1000"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_indexMaxDocValueFieldsSearch() default "1000"; /* Deprecate use rollover prop instead */
+ String monthlyIndex_itemsMonthlyIndexedOverride() default "event,session"; /* Deprecate use rollover prop instead */
+ String rollover_numberOfShards();
+ String rollover_numberOfReplicas();
+ String rollover_indexMappingTotalFieldsLimit();
+ String rollover_indexMaxDocValueFieldsSearch();
+ String rollover_indices();
+ String rollover_maxSize();
+ String rollover_maxAge() default "365d";
+ String rollover_maxDocs();
+
+ int defaultQueryLimit() default 10;
+ int removeByQueryTimeoutInMinutes() default 10;
+ int aggregateQueryBucketSize() default 5000;
+ int clientSocketTimeout() default -1;
+ int aggQueryMaxResponseSizeHttp() default -1;
+ boolean aggQueryThrowOnMissingDocs() default false;
+ boolean useBatchingForSave() default false;
+ boolean useBatchingForUpdate() default true;
+ String itemTypeToRefreshPolicy();
+
+ int bulkProcessor_concurrentRequests() default 1;
+ int bulkProcessor_bulkActions() default 1000;
+ String bulkProcessor_bulkSize() default "5MB";
+ String bulkProcessor_flushInterval() default "5s";
+ String bulkProcessor_backoffPolicy() default "exponential";
+}
\ No newline at end of file
diff --git a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 864548f3e7..dad8758d4d 100644
--- a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++ b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -36,9 +36,7 @@
import org.apache.unomi.metrics.MetricAdapter;
import org.apache.unomi.metrics.MetricsService;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionContextHelper;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilder;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionESQueryBuilderDispatcher;
-import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluator;
import org.apache.unomi.persistence.elasticsearch.conditions.ConditionEvaluatorDispatcher;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.BaseAggregate;
@@ -138,8 +136,9 @@
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
-import org.osgi.framework.ServiceReference;
import org.osgi.framework.SynchronousBundleListener;
+import org.osgi.service.component.annotations.*;
+import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,313 +168,88 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+@Component(immediate = true, service = {PersistenceService.class, SynchronousBundleListener.class})
+@Designate(ocd = ElasticSearchPersistenceServiceConf.class)
@SuppressWarnings("rawtypes")
public class ElasticSearchPersistenceServiceImpl implements PersistenceService, SynchronousBundleListener {
- public static final String BULK_PROCESSOR_CONCURRENT_REQUESTS = "bulkProcessor.concurrentRequests";
- public static final String BULK_PROCESSOR_BULK_ACTIONS = "bulkProcessor.bulkActions";
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
+
public static final String BULK_PROCESSOR_BULK_SIZE = "bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL = "bulkProcessor.flushInterval";
public static final String BULK_PROCESSOR_BACKOFF_POLICY = "bulkProcessor.backoffPolicy";
- public static final String MONTHLY_INDEX_ITEMS_MONTHLY_INDEXED = "monthlyIndex.itemsMonthlyIndexedOverride";
public static final String INDEX_DATE_PREFIX = "date-";
public static final String SEQ_NO = "seq_no";
public static final String PRIMARY_TERM = "primary_term";
-
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchPersistenceServiceImpl.class.getName());
private static final String ROLLOVER_LIFECYCLE_NAME = "unomi-rollover-policy";
- private boolean throwExceptions = false;
- private RestHighLevelClient client;
- private BulkProcessor bulkProcessor;
- private String elasticSearchAddresses;
- private List elasticSearchAddressList = new ArrayList<>();
- private String clusterName;
- private String indexPrefix;
- private String monthlyIndexNumberOfShards;
- private String monthlyIndexNumberOfReplicas;
- private String monthlyIndexMappingTotalFieldsLimit;
- private String monthlyIndexMaxDocValueFieldsSearch;
- private String numberOfShards;
- private String numberOfReplicas;
- private String indexMappingTotalFieldsLimit;
- private String indexMaxDocValueFieldsSearch;
- private String[] fatalIllegalStateErrors;
- private BundleContext bundleContext;
- private Map mappings = new HashMap();
+
+ private ElasticSearchPersistenceServiceConf conf;
private ConditionEvaluatorDispatcher conditionEvaluatorDispatcher;
private ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher;
- private List itemsMonthlyIndexed;
- private Map routingByType;
-
- private Integer defaultQueryLimit = 10;
- private Integer removeByQueryTimeoutInMinutes = 10;
-
- private String bulkProcessorConcurrentRequests = "1";
- private String bulkProcessorBulkActions = "1000";
- private String bulkProcessorBulkSize = "5MB";
- private String bulkProcessorFlushInterval = "5s";
- private String bulkProcessorBackoffPolicy = "exponential";
-
- // Rollover configuration
- private List rolloverIndices;
- private String rolloverMaxSize;
- private String rolloverMaxAge;
- private String rolloverMaxDocs;
- private String rolloverIndexNumberOfShards;
- private String rolloverIndexNumberOfReplicas;
- private String rolloverIndexMappingTotalFieldsLimit;
- private String rolloverIndexMaxDocValueFieldsSearch;
-
- private String minimalElasticSearchVersion = "7.0.0";
- private String maximalElasticSearchVersion = "8.0.0";
-
- // authentication props
- private String username;
- private String password;
- private boolean sslEnable = false;
- private boolean sslTrustAllCertificates = false;
-
- private int aggregateQueryBucketSize = 5000;
-
private MetricsService metricsService;
- private boolean useBatchingForSave = false;
- private boolean useBatchingForUpdate = true;
- private String logLevelRestClient = "ERROR";
- private boolean alwaysOverwrite = true;
- private boolean aggQueryThrowOnMissingDocs = false;
- private Integer aggQueryMaxResponseSizeHttp = null;
- private Integer clientSocketTimeout = null;
- private Map itemTypeToRefreshPolicy = new HashMap<>();
+ private BundleContext bundleContext;
+ private List itemsMonthlyIndexed;
+ private List rolloverIndices;
+ private Map mappings = new HashMap();
private Map>> knownMappings = new HashMap<>();
+ private RestHighLevelClient client;
+ private BulkProcessor bulkProcessor;
+ private List elasticSearchAddressList = new ArrayList<>();
+ private String[] fatalIllegalStateErrors = new String[]{};
+ private Map itemTypeToRefreshPolicy = new HashMap<>();
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- public void setElasticSearchAddresses(String elasticSearchAddresses) {
- this.elasticSearchAddresses = elasticSearchAddresses;
- String[] elasticSearchAddressesArray = elasticSearchAddresses.split(",");
- elasticSearchAddressList.clear();
- for (String elasticSearchAddress : elasticSearchAddressesArray) {
- elasticSearchAddressList.add(elasticSearchAddress.trim());
- }
- }
-
- public void setItemTypeToRefreshPolicy(String itemTypeToRefreshPolicy) throws IOException {
- if (!itemTypeToRefreshPolicy.isEmpty()) {
- this.itemTypeToRefreshPolicy = new ObjectMapper().readValue(itemTypeToRefreshPolicy,
- new TypeReference>() {
- });
- }
- }
-
- public void setFatalIllegalStateErrors(String fatalIllegalStateErrors) {
- this.fatalIllegalStateErrors = Arrays.stream(fatalIllegalStateErrors.split(","))
- .map(i -> i.trim()).filter(i -> !i.isEmpty()).toArray(String[]::new);
- }
-
- public void setAggQueryMaxResponseSizeHttp(String aggQueryMaxResponseSizeHttp) {
- if (StringUtils.isNumeric(aggQueryMaxResponseSizeHttp)) {
- this.aggQueryMaxResponseSizeHttp = Integer.parseInt(aggQueryMaxResponseSizeHttp);
- }
- }
-
- public void setIndexPrefix(String indexPrefix) {
- this.indexPrefix = indexPrefix;
- }
-
- @Deprecated
- public void setMonthlyIndexNumberOfShards(String monthlyIndexNumberOfShards) {
- this.monthlyIndexNumberOfShards = monthlyIndexNumberOfShards;
- }
-
- @Deprecated
- public void setMonthlyIndexNumberOfReplicas(String monthlyIndexNumberOfReplicas) {
- this.monthlyIndexNumberOfReplicas = monthlyIndexNumberOfReplicas;
- }
-
- @Deprecated
- public void setMonthlyIndexMappingTotalFieldsLimit(String monthlyIndexMappingTotalFieldsLimit) {
- this.monthlyIndexMappingTotalFieldsLimit = monthlyIndexMappingTotalFieldsLimit;
- }
-
- @Deprecated
- public void setMonthlyIndexMaxDocValueFieldsSearch(String monthlyIndexMaxDocValueFieldsSearch) {
- this.monthlyIndexMaxDocValueFieldsSearch = monthlyIndexMaxDocValueFieldsSearch;
- }
-
- @Deprecated
- public void setItemsMonthlyIndexedOverride(String itemsMonthlyIndexedOverride) {
- this.itemsMonthlyIndexed = StringUtils.isNotEmpty(itemsMonthlyIndexedOverride) ? Arrays.asList(itemsMonthlyIndexedOverride.split(",").clone()) : Collections.emptyList();
- }
-
- public void setNumberOfShards(String numberOfShards) {
- this.numberOfShards = numberOfShards;
- }
-
- public void setNumberOfReplicas(String numberOfReplicas) {
- this.numberOfReplicas = numberOfReplicas;
- }
-
- public void setIndexMappingTotalFieldsLimit(String indexMappingTotalFieldsLimit) {
- this.indexMappingTotalFieldsLimit = indexMappingTotalFieldsLimit;
- }
-
- public void setIndexMaxDocValueFieldsSearch(String indexMaxDocValueFieldsSearch) {
- this.indexMaxDocValueFieldsSearch = indexMaxDocValueFieldsSearch;
- }
-
- public void setDefaultQueryLimit(Integer defaultQueryLimit) {
- this.defaultQueryLimit = defaultQueryLimit;
- }
-
- public void setRoutingByType(Map routingByType) {
- this.routingByType = routingByType;
- }
-
+ @Reference
public void setConditionEvaluatorDispatcher(ConditionEvaluatorDispatcher conditionEvaluatorDispatcher) {
this.conditionEvaluatorDispatcher = conditionEvaluatorDispatcher;
}
+ @Reference
public void setConditionESQueryBuilderDispatcher(ConditionESQueryBuilderDispatcher conditionESQueryBuilderDispatcher) {
this.conditionESQueryBuilderDispatcher = conditionESQueryBuilderDispatcher;
}
- public void setBulkProcessorConcurrentRequests(String bulkProcessorConcurrentRequests) {
- this.bulkProcessorConcurrentRequests = bulkProcessorConcurrentRequests;
- }
-
- public void setBulkProcessorBulkActions(String bulkProcessorBulkActions) {
- this.bulkProcessorBulkActions = bulkProcessorBulkActions;
- }
-
- public void setBulkProcessorBulkSize(String bulkProcessorBulkSize) {
- this.bulkProcessorBulkSize = bulkProcessorBulkSize;
- }
-
- public void setBulkProcessorFlushInterval(String bulkProcessorFlushInterval) {
- this.bulkProcessorFlushInterval = bulkProcessorFlushInterval;
- }
-
- public void setBulkProcessorBackoffPolicy(String bulkProcessorBackoffPolicy) {
- this.bulkProcessorBackoffPolicy = bulkProcessorBackoffPolicy;
- }
-
- public void setRolloverIndices(String rolloverIndices) {
- this.rolloverIndices = StringUtils.isNotEmpty(rolloverIndices) ? Arrays.asList(rolloverIndices.split(",").clone()) : null;
- }
-
- public void setRolloverMaxSize(String rolloverMaxSize) {
- this.rolloverMaxSize = rolloverMaxSize;
- }
-
- public void setRolloverMaxAge(String rolloverMaxAge) {
- this.rolloverMaxAge = rolloverMaxAge;
- }
-
- public void setRolloverMaxDocs(String rolloverMaxDocs) {
- this.rolloverMaxDocs = rolloverMaxDocs;
- }
-
- public void setRolloverIndexNumberOfShards(String rolloverIndexNumberOfShards) {
- this.rolloverIndexNumberOfShards = rolloverIndexNumberOfShards;
- }
-
- public void setRolloverIndexNumberOfReplicas(String rolloverIndexNumberOfReplicas) {
- this.rolloverIndexNumberOfReplicas = rolloverIndexNumberOfReplicas;
- }
-
- public void setRolloverIndexMappingTotalFieldsLimit(String rolloverIndexMappingTotalFieldsLimit) {
- this.rolloverIndexMappingTotalFieldsLimit = rolloverIndexMappingTotalFieldsLimit;
- }
-
- public void setRolloverIndexMaxDocValueFieldsSearch(String rolloverIndexMaxDocValueFieldsSearch) {
- this.rolloverIndexMaxDocValueFieldsSearch = rolloverIndexMaxDocValueFieldsSearch;
- }
-
- public void setMinimalElasticSearchVersion(String minimalElasticSearchVersion) {
- this.minimalElasticSearchVersion = minimalElasticSearchVersion;
- }
-
- public void setMaximalElasticSearchVersion(String maximalElasticSearchVersion) {
- this.maximalElasticSearchVersion = maximalElasticSearchVersion;
- }
-
- public void setAggregateQueryBucketSize(int aggregateQueryBucketSize) {
- this.aggregateQueryBucketSize = aggregateQueryBucketSize;
- }
-
- public void setClientSocketTimeout(String clientSocketTimeout) {
- if (StringUtils.isNumeric(clientSocketTimeout)) {
- this.clientSocketTimeout = Integer.parseInt(clientSocketTimeout);
- }
- }
-
+ @Reference
public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
- public void setUseBatchingForSave(boolean useBatchingForSave) {
- this.useBatchingForSave = useBatchingForSave;
- }
-
- public void setUseBatchingForUpdate(boolean useBatchingForUpdate) {
- this.useBatchingForUpdate = useBatchingForUpdate;
- }
-
- public void setUsername(String username) {
- this.username = username;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public void setSslEnable(boolean sslEnable) {
- this.sslEnable = sslEnable;
- }
-
- public void setSslTrustAllCertificates(boolean sslTrustAllCertificates) {
- this.sslTrustAllCertificates = sslTrustAllCertificates;
- }
-
-
- public void setAggQueryThrowOnMissingDocs(boolean aggQueryThrowOnMissingDocs) {
- this.aggQueryThrowOnMissingDocs = aggQueryThrowOnMissingDocs;
- }
-
- public void setThrowExceptions(boolean throwExceptions) {
- this.throwExceptions = throwExceptions;
- }
-
- public void setAlwaysOverwrite(boolean alwaysOverwrite) {
- this.alwaysOverwrite = alwaysOverwrite;
- }
-
- public void setLogLevelRestClient(String logLevelRestClient) {
- this.logLevelRestClient = logLevelRestClient;
- }
+ @Activate
+ public void start(ElasticSearchPersistenceServiceConf conf, BundleContext bundleContext) throws Exception {
+ this.bundleContext = bundleContext;
+ this.conf = conf;
- public void start() throws Exception {
+ // init some data based on conf
+ if (StringUtils.isNotEmpty(conf.elasticSearchAddresses())) {
+ this.elasticSearchAddressList = Arrays.stream(conf.elasticSearchAddresses().split(",")).map(String::trim).filter(i -> !i.isEmpty()).collect(Collectors.toList());
+ }
+ if (StringUtils.isNotEmpty(conf.itemTypeToRefreshPolicy())){
+ this.itemTypeToRefreshPolicy = new ObjectMapper().readValue(conf.itemTypeToRefreshPolicy(), new TypeReference>() {});
+ }
+ if (StringUtils.isNotEmpty(conf.fatalIllegalStateErrors())) {
+ this.fatalIllegalStateErrors = Arrays.stream(conf.fatalIllegalStateErrors().split(",")).map(String::trim).filter(i -> !i.isEmpty()).toArray(String[]::new);
+ }
+ if (StringUtils.isNotEmpty(conf.monthlyIndex_itemsMonthlyIndexedOverride())) {
+ this.itemsMonthlyIndexed = Arrays.asList(conf.monthlyIndex_itemsMonthlyIndexedOverride().split(",").clone());
+ }
+ if (StringUtils.isNotEmpty(conf.rollover_indices())) {
+ this.rolloverIndices = Arrays.asList(conf.rollover_indices().split(",").clone());
+ }
// Work around to avoid ES Logs regarding the deprecated [ignore_throttled] parameter
try {
- Level lvl = Level.toLevel(logLevelRestClient, Level.ERROR);
+ Level lvl = Level.toLevel(conf.logLevelRestClient(), Level.ERROR);
org.apache.log4j.Logger.getLogger("org.elasticsearch.client.RestClient").setLevel(lvl);
} catch (Exception e) {
// Never fail because of the set of the logger
}
// on startup
- new InClassLoaderExecute