Skip to content

Commit

Permalink
Add migration to create events and system-events index sets and streams
Browse files Browse the repository at this point in the history
Closes #6085
  • Loading branch information
bernd committed Jul 5, 2019
1 parent 53d876c commit 1d7e176
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 13 deletions.
Expand Up @@ -17,26 +17,19 @@
package org.graylog2.configuration;

import com.github.joschi.jadconfig.Parameter;
import com.github.joschi.jadconfig.ValidationException;
import com.github.joschi.jadconfig.ValidatorMethod;
import com.github.joschi.jadconfig.converters.StringListConverter;
import com.github.joschi.jadconfig.util.Duration;
import com.github.joschi.jadconfig.validators.FilePathReadableValidator;
import com.github.joschi.jadconfig.validators.InetPortValidator;
import com.github.joschi.jadconfig.validators.PositiveDurationValidator;
import com.github.joschi.jadconfig.validators.PositiveIntegerValidator;
import com.github.joschi.jadconfig.validators.PositiveLongValidator;
import com.github.joschi.jadconfig.validators.StringNotBlankValidator;
import org.joda.time.Period;

import javax.validation.constraints.NotNull;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Locale;

public class ElasticsearchConfiguration {
public static final String DEFAULT_EVENTS_INDEX_PREFIX = "default_events_index_prefix";
public static final String DEFAULT_SYSTEM_EVENTS_INDEX_PREFIX = "default_system_events_index_prefix";

@Parameter(value = "elasticsearch_disable_version_check")
private boolean disableVersionCheck = false;

Expand Down Expand Up @@ -107,6 +100,12 @@ public class ElasticsearchConfiguration {
@Parameter(value = "index_field_type_periodical_interval", validator = PositiveDurationValidator.class)
private Duration indexFieldTypePeriodicalInterval = Duration.hours(1L);

@Parameter(value = DEFAULT_EVENTS_INDEX_PREFIX, validators = StringNotBlankValidator.class)
private String defaultEventsIndexPrefix = "gl-events";

@Parameter(value = DEFAULT_SYSTEM_EVENTS_INDEX_PREFIX, validators = StringNotBlankValidator.class)
private String defaultSystemEventsIndexPrefix = "gl-system-events";

public boolean isDisableVersionCheck() {
return disableVersionCheck;
}
Expand Down Expand Up @@ -195,4 +194,12 @@ public int getIndexOptimizationJobs() {
public Duration getIndexFieldTypePeriodicalInterval() {
return indexFieldTypePeriodicalInterval;
}

public String getDefaultEventsIndexPrefix() {
return defaultEventsIndexPrefix;
}

public String getDefaultSystemEventsIndexPrefix() {
return defaultSystemEventsIndexPrefix;
}
}
Expand Up @@ -48,6 +48,7 @@
public abstract class IndexSetConfig implements Comparable<IndexSetConfig> {
public static final String FIELD_INDEX_PREFIX = "index_prefix";
public static final String FIELD_CREATION_DATE = "creation_date";
public static final String FIELD_INDEX_TEMPLATE_TYPE = "index_template_type";
public static final String INDEX_PREFIX_REGEX = "^[a-z0-9][a-z0-9_+-]*$";

private static final Duration DEFAULT_FIELD_TYPE_REFRESH_INTERVAL = Duration.standardSeconds(5L);
Expand Down Expand Up @@ -126,7 +127,7 @@ public enum TemplateType {
@NotBlank
public abstract String indexTemplateName();

@JsonProperty("index_template_type")
@JsonProperty(FIELD_INDEX_TEMPLATE_TYPE)
@NotBlank
public abstract Optional<TemplateType> indexTemplateType();

Expand Down Expand Up @@ -157,7 +158,7 @@ public static IndexSetConfig create(@Id @ObjectId @JsonProperty("_id") @Nullable
@JsonProperty(FIELD_CREATION_DATE) @NotNull ZonedDateTime creationDate,
@JsonProperty("index_analyzer") @Nullable String indexAnalyzer,
@JsonProperty("index_template_name") @Nullable String indexTemplateName,
@JsonProperty("index_template_type") @Nullable TemplateType indexTemplateType,
@JsonProperty(FIELD_INDEX_TEMPLATE_TYPE) @Nullable TemplateType indexTemplateType,
@JsonProperty("index_optimization_max_num_segments") @Nullable Integer maxNumSegments,
@JsonProperty("index_optimization_disabled") @Nullable Boolean indexOptimizationDisabled,
@JsonProperty("field_type_refresh_interval") @Nullable Duration fieldTypeRefreshInterval) {
Expand Down
Expand Up @@ -37,5 +37,6 @@ protected void configure() {
addMigration(V2018070614390000_EnforceUniqueGrokPatterns.class);
addMigration(V20180718155800_AddContentPackIdAndRev.class);
addMigration(V20180924111644_AddDefaultGrokPatterns.class);
addMigration(V20190705071400_AddEventIndexSetsMigration.class);
}
}
@@ -0,0 +1,207 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.migrations;

import com.google.common.collect.ImmutableMap;
import com.mongodb.DuplicateKeyException;
import org.bson.types.ObjectId;
import org.graylog2.configuration.ElasticsearchConfiguration;
import org.graylog2.database.NotFoundException;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetValidator;
import org.graylog2.indexer.MongoIndexSet;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.indexer.indexset.IndexSetService;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategy;
import org.graylog2.indexer.retention.strategies.DeletionRetentionStrategyConfig;
import org.graylog2.indexer.rotation.strategies.TimeBasedRotationStrategy;
import org.graylog2.indexer.rotation.strategies.TimeBasedRotationStrategyConfig;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamImpl;
import org.graylog2.streams.StreamService;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.mongojack.DBQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import static java.util.Locale.US;
import static java.util.Objects.requireNonNull;

public class V20190705071400_AddEventIndexSetsMigration extends Migration {
private static final Logger LOG = LoggerFactory.getLogger(V20190705071400_AddEventIndexSetsMigration.class);

private final ElasticsearchConfiguration elasticsearchConfiguration;
private final MongoIndexSet.Factory mongoIndexSetFactory;
private final IndexSetService indexSetService;
private final IndexSetValidator indexSetValidator;
private final StreamService streamService;

@Inject
public V20190705071400_AddEventIndexSetsMigration(ElasticsearchConfiguration elasticsearchConfiguration,
MongoIndexSet.Factory mongoIndexSetFactory,
IndexSetService indexSetService,
IndexSetValidator indexSetValidator,
StreamService streamService) {
this.elasticsearchConfiguration = elasticsearchConfiguration;
this.mongoIndexSetFactory = mongoIndexSetFactory;
this.indexSetService = indexSetService;
this.indexSetValidator = indexSetValidator;
this.streamService = streamService;
}

@Override
public ZonedDateTime createdAt() {
return ZonedDateTime.parse("2019-07-05T07:14:00Z");
}

@Override
public void upgrade() {
ensureEventsStreamAndIndexSet(
"Graylog Events",
"Stores Graylog events.",
elasticsearchConfiguration.getDefaultEventsIndexPrefix(),
ElasticsearchConfiguration.DEFAULT_EVENTS_INDEX_PREFIX,
Stream.DEFAULT_EVENTS_STREAM_ID,
"All events",
"Stream containing all events created by Graylog"
);
ensureEventsStreamAndIndexSet(
"Graylog System Events",
"Stores Graylog system events.",
elasticsearchConfiguration.getDefaultSystemEventsIndexPrefix(),
ElasticsearchConfiguration.DEFAULT_SYSTEM_EVENTS_INDEX_PREFIX,
Stream.DEFAULT_SYSTEM_EVENTS_STREAM_ID,
"All system events",
"Stream containing all system events created by Graylog"
);
}

private void ensureEventsStreamAndIndexSet(String indexSetTitle,
String indexSetDescription,
String indexPrefix,
String indexPrefixConfigKey,
String streamId,
String streamTitle,
String streamDescription) {
checkIndexPrefixConflicts(indexPrefix, indexPrefixConfigKey);

final IndexSet eventsIndexSet = setupEventsIndexSet(indexSetTitle, indexSetDescription, indexPrefix);
try {
streamService.load(streamId);
} catch (NotFoundException ignored) {
createEventsStream(streamId, streamTitle, streamDescription, eventsIndexSet);
}
}

private void checkIndexPrefixConflicts(String indexPrefix, String configKey) {
final DBQuery.Query query = DBQuery.and(
DBQuery.notEquals(IndexSetConfig.FIELD_INDEX_TEMPLATE_TYPE, Optional.of(IndexSetConfig.TemplateType.EVENTS)),
DBQuery.is(IndexSetConfig.FIELD_INDEX_PREFIX, indexPrefix)
);

if (indexSetService.findOne(query).isPresent()) {
final String msg = String.format(US, "Index prefix conflict: a non-events index-set with prefix <%s> already exists. Configure a different <%s> value in the server config file.",
indexPrefix, configKey);
throw new IllegalStateException(msg);
}
}

private Optional<IndexSetConfig> getEventsIndexSetConfig(String indexPrefix) {
final DBQuery.Query query = DBQuery.and(
DBQuery.is(IndexSetConfig.FIELD_INDEX_TEMPLATE_TYPE, Optional.of(IndexSetConfig.TemplateType.EVENTS)),
DBQuery.is(IndexSetConfig.FIELD_INDEX_PREFIX, indexPrefix)
);
return indexSetService.findOne(query);
}

private IndexSet setupEventsIndexSet(String indexSetTitle, String indexSetDescription, String indexPrefix) {
final Optional<IndexSetConfig> optionalIndexSetConfig = getEventsIndexSetConfig(indexPrefix);
if (optionalIndexSetConfig.isPresent()) {
return mongoIndexSetFactory.create(optionalIndexSetConfig.get());
}

final IndexSetConfig indexSetConfig = IndexSetConfig.builder()
.title(indexSetTitle)
.description(indexSetDescription)
.indexTemplateType(IndexSetConfig.TemplateType.EVENTS)
.isWritable(true)
.indexPrefix(indexPrefix)
.shards(elasticsearchConfiguration.getShards())
.replicas(elasticsearchConfiguration.getReplicas())
.rotationStrategyClass(TimeBasedRotationStrategy.class.getCanonicalName())
.rotationStrategy(TimeBasedRotationStrategyConfig.create(Period.months(1)))
.retentionStrategyClass(DeletionRetentionStrategy.class.getCanonicalName())
.retentionStrategy(DeletionRetentionStrategyConfig.create(12))
.creationDate(ZonedDateTime.now(ZoneOffset.UTC))
.indexAnalyzer(elasticsearchConfiguration.getAnalyzer())
.indexTemplateName(indexPrefix+ "-template")
.indexOptimizationMaxNumSegments(elasticsearchConfiguration.getIndexOptimizationMaxNumSegments())
.indexOptimizationDisabled(elasticsearchConfiguration.isDisableIndexOptimization())
.fieldTypeRefreshInterval(Duration.standardMinutes(1))
.build();

try {
final Optional<IndexSetValidator.Violation> violation = indexSetValidator.validate(indexSetConfig);
if (violation.isPresent()) {
throw new RuntimeException(violation.get().message());
}

final IndexSetConfig savedIndexSet = indexSetService.save(indexSetConfig);

LOG.info("Successfully created events index-set <{}/{}>", savedIndexSet.id(), savedIndexSet.title());

return mongoIndexSetFactory.create(savedIndexSet);
} catch (DuplicateKeyException e) {
LOG.error("Couldn't create index-set <{}/{}>", indexSetTitle, indexPrefix);
throw new RuntimeException(e.getMessage());
}
}

private void createEventsStream(String streamId, String streamTitle, String streamDescription, IndexSet indexSet) {
final ObjectId id = new ObjectId(streamId);
final Map<String, Object> fields = ImmutableMap.<String, Object>builder()
.put(StreamImpl.FIELD_TITLE, streamTitle)
.put(StreamImpl.FIELD_DESCRIPTION, streamDescription)
.put(StreamImpl.FIELD_DISABLED, false)
.put(StreamImpl.FIELD_CREATED_AT, DateTime.now(DateTimeZone.UTC))
.put(StreamImpl.FIELD_CREATOR_USER_ID, "admin")
.put(StreamImpl.FIELD_MATCHING_TYPE, StreamImpl.MatchingType.DEFAULT.name())
.put(StreamImpl.FIELD_REMOVE_MATCHES_FROM_DEFAULT_STREAM, true)
.put(StreamImpl.FIELD_INDEX_SET_ID, requireNonNull(indexSet.getConfig().id(), "index set ID cannot be null"))
.put(StreamImpl.FIELD_DEFAULT_STREAM, false)
.build();
final Stream stream = new StreamImpl(id, fields, Collections.emptyList(), Collections.emptySet(), indexSet);

try {
streamService.save(stream);
LOG.info("Successfully created events stream <{}/{}>", stream.getId(), stream.getTitle());
} catch (ValidationException e) {
LOG.error("Couldn't create events stream <{}/{}>! This is a bug!", streamId, streamTitle, e);
}
}
}
Expand Up @@ -17,6 +17,7 @@
package org.graylog2.plugin.streams;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableSet;
import org.graylog2.indexer.IndexSet;
import org.graylog2.plugin.database.Persisted;

Expand All @@ -27,7 +28,22 @@
import static com.google.common.base.Strings.emptyToNull;

public interface Stream extends Persisted {
/**
* The ID of the default message stream for all messages.
*/
String DEFAULT_STREAM_ID = "000000000000000000000001";
/**
* The ID of the default events stream for user generated events.
*/
String DEFAULT_EVENTS_STREAM_ID = "000000000000000000000002";
/**
* The ID of the default events stream for system events.
*/
String DEFAULT_SYSTEM_EVENTS_STREAM_ID = "000000000000000000000003";
/**
* Contains all default event streams. (e.g. events and system events)
*/
ImmutableSet<String> DEFAULT_EVENT_STREAM_IDS = ImmutableSet.of(DEFAULT_EVENTS_STREAM_ID, DEFAULT_SYSTEM_EVENTS_STREAM_ID);

enum MatchingType {
AND,
Expand Down

0 comments on commit 1d7e176

Please sign in to comment.