Skip to content

Commit

Permalink
first parts of a working deflector
Browse files Browse the repository at this point in the history
  • Loading branch information
Lennart Koopmann committed Sep 6, 2012
1 parent ff53563 commit 13d04dd
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 29 deletions.
3 changes: 2 additions & 1 deletion misc/graylog2.conf
Expand Up @@ -14,10 +14,11 @@ syslog_store_full_message = true

# ElasticSearch URL (default: http://localhost:9200/)
elasticsearch_url = http://localhost:9200/
elasticsearch_index_name = graylog2
elasticsearch_index_prefix = graylog2
# Embedded elasticsearch configuration file
# pay attention to the working directory of the server, maybe use an absolute path here
elasticsearch_config_file = /etc/graylog2-elasticsearch.yml
elasticsearch_max_docs_per_index = 80000000

# How many minutes of messages do you want to keep in the recent index? This index lives in memory only and is used to build the overview and stream pages. Raise this value if you want to see more messages in the overview pages. This is not affecting for example searches which are always targeting *all* indices.
recent_index_ttl_minutes = 60
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/org/graylog2/Configuration.java
Expand Up @@ -91,8 +91,11 @@ public class Configuration {
@Parameter(value = "elasticsearch_config_file", required = true, validator = FilePresentValidator.class)
private String elasticSearchConfigFile = "/etc/graylog2-elasticsearch.yml";

@Parameter(value = "elasticsearch_index_name", required = true)
private String elasticsearchIndexName = "graylog2";
@Parameter(value = "elasticsearch_index_prefix", required = true)
private String elasticsearchIndexPrefix = "graylog2";

@Parameter(value = "elasticsearch_max_docs_per_index", validator = PositiveIntegerValidator.class, required = true)
private int elasticsearchMaxDocsPerIndex = 80000000;

@Parameter(value = "mongodb_user")
private String mongoUser;
Expand Down Expand Up @@ -261,8 +264,12 @@ public String getElasticSearchConfigFile() {
return elasticSearchConfigFile;
}

public String getElasticSearchIndexName() {
return this.elasticsearchIndexName;
public String getElasticSearchIndexPrefix() {
return this.elasticsearchIndexPrefix;
}

public int getElasticSearchMaxDocsPerIndex() {
return this.elasticsearchMaxDocsPerIndex;
}

public boolean isMongoUseAuth() {
Expand Down
25 changes: 13 additions & 12 deletions src/main/java/org/graylog2/GraylogServer.java
Expand Up @@ -42,6 +42,7 @@
import com.yammer.metrics.HealthChecks;
import com.yammer.metrics.core.HealthCheck;
import org.graylog2.database.HostCounterCache;
import org.graylog2.indexer.Deflector;

public class GraylogServer implements Runnable {

Expand Down Expand Up @@ -77,6 +78,8 @@ public class GraylogServer implements Runnable {
private ProcessBuffer processBuffer;
private OutputBuffer outputBuffer;

private Deflector deflector;

private String serverId;

public void initialize(Configuration configuration) {
Expand Down Expand Up @@ -151,19 +154,13 @@ public void run() {
BlacklistCache.initialize(this);
StreamCache.initialize(this);

if (indexer.indexExists(indexer.getMainIndexName())) {
LOG.info("Main index exists. Not creating it.");
} else {
LOG.info("Main index does not exist! Trying to create it ...");
if (indexer.createIndex()) {
LOG.info("Successfully created main index.");
} else {
LOG.fatal("Could not create main index. Terminating.");
System.exit(1);
}
}

// XXX TODO lol code duplication. make this smart.
// Set up deflector.
LOG.info("Setting up deflector.");
deflector = new Deflector(this);
deflector.setUp();

// Set up recent index.
if (indexer.indexExists(EmbeddedElasticSearchClient.RECENT_INDEX_NAME)) {
LOG.info("Recent index exists. Not creating it.");
} else {
Expand Down Expand Up @@ -262,6 +259,10 @@ public HostCounterCache getHostCounterCache() {
return this.hostCounterCache;
}

public Deflector getDeflector() {
return this.deflector;
}

public int getLastReceivedMessageTimestamp() {
return this.lastReceivedMessageTimestamp;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/graylog2/Main.java
Expand Up @@ -129,6 +129,7 @@ public static void main(String[] args) {
if (configuration.isEnableHealthCheckHttpApi()) {
server.registerInitializer(new HealthCheckHTTPServerInitializer(configuration.getHealthCheckHttpApiPort()));
}
server.registerInitializer(new DeflectorThreadsInitializer(server));

// Register inputs.
if (configuration.isUseGELF()) {
Expand Down
160 changes: 160 additions & 0 deletions src/main/java/org/graylog2/indexer/Deflector.java
@@ -0,0 +1,160 @@
/**
* Copyright 2012 Lennart Koopmann <lennart@socketfeed.com>
*
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*
*/
package org.graylog2.indexer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import org.apache.log4j.Logger;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.graylog2.GraylogServer;

/**
*
* Format of actual indexes behind the Deflector:
* [configured_prefix]_1
* [configured_prefix]_2
* [configured_prefix]_3
* ...
*
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
public class Deflector {

private static final Logger LOG = Logger.getLogger(Deflector.class);

public static final String DEFLECTOR_NAME = "graylog2_deflector";

GraylogServer server;

public Deflector(GraylogServer server) {
this.server = server;
}

public boolean isUp() {
return this.server.getIndexer().indexExists(DEFLECTOR_NAME);
}

public void setUp() {
// Check if there already is an deflector index pointing somewhere.
if (isUp()) {
LOG.info("Found deflector alias <" + DEFLECTOR_NAME + ">. Using it.");
} else {
LOG.info("Did not find an deflector alias. Setting one up now.");

// Do we have a target index to point to?
try {
String currentTarget = getCurrentTargetName();
LOG.info("Pointing to already existing index target <" + currentTarget + ">");
pointTo(currentTarget);
} catch(NoTargetIndexException ex) {
LOG.info("There is no index target to point to. Creating one now.");
cycle(); // No index, so automatically cycling to 0.
}
}
}

public void cycle() {
LOG.info("Cycling deflector to next index now.");
int oldTargetNumber;

try {
oldTargetNumber = getCurrentTargetNumber();
} catch (NoTargetIndexException ex) {
oldTargetNumber = -1;
}

int newTargetNumber = oldTargetNumber+1;

String newTarget = buildIndexName(server.getConfiguration().getElasticSearchIndexPrefix(), newTargetNumber);
String oldTarget = buildIndexName(server.getConfiguration().getElasticSearchIndexPrefix(), oldTargetNumber);

LOG.info("Cycling from <" + oldTarget + "> to <" + newTarget + ">");

// Create new index.
LOG.info("Creating index target <" + newTarget + ">...");
server.getIndexer().createIndex(newTarget);
LOG.info("Done!");

// Point to deflector to new index.
LOG.info("Pointing deflector to new target index....");
if (oldTargetNumber == -1) {
// Only poiting, not cycling.
pointTo(newTarget);
} else {
// Re-poiting from existing old index to the new one.
pointTo(newTarget, oldTarget);
}
LOG.info("Done!");
}

public int getCurrentTargetNumber() throws NoTargetIndexException {
Map<String, IndexStats> indexes = this.server.getIndexer().getIndices();
if (indexes.isEmpty()) {
throw new NoTargetIndexException();
}

List<Integer> indexNumbers = new ArrayList<Integer>();

for(Map.Entry<String, IndexStats> e : indexes.entrySet()) {
try {
indexNumbers.add(extractIndexNumber(e.getKey()));
} catch (NumberFormatException ex) {
continue;
}
}

if (indexNumbers.isEmpty()) {
throw new NoTargetIndexException();
}

return Collections.max(indexNumbers);
}

public String getCurrentTargetName() throws NoTargetIndexException {
return buildIndexName(this.server.getConfiguration().getElasticSearchIndexPrefix(), getCurrentTargetNumber());
}

public String buildIndexName(String prefix, int number) {
return prefix + "_" + number;
}

public int extractIndexNumber(String indexName) throws NumberFormatException {
String[] parts = indexName.split("_");

try {
return Integer.parseInt(parts[parts.length-1]);
} catch(Exception e) {
LOG.debug("Could not extract index number from index <" + indexName + ">.");
throw new NumberFormatException();
}
}

private void pointTo(String newIndex, String oldIndex) {
server.getIndexer().cycleAlias(DEFLECTOR_NAME, newIndex, oldIndex);
}

private void pointTo(String newIndex) {
server.getIndexer().cycleAlias(DEFLECTOR_NAME, newIndex);
}

}
55 changes: 45 additions & 10 deletions src/main/java/org/graylog2/indexer/EmbeddedElasticSearchClient.java
Expand Up @@ -14,12 +14,17 @@
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
Expand Down Expand Up @@ -83,19 +88,29 @@ public void run() {
});

}

public String allIndicesAlias() {
return server.getConfiguration().getElasticSearchIndexPrefix() + "_*";
}

public Map<String, IndexStats> getIndices() {
ActionFuture<IndicesStats> isr = client.admin().indices().stats(new IndicesStatsRequest().all());

return isr.actionGet().indices();
}

public boolean indexExists(String index) {
ActionFuture<IndicesExistsResponse> existsFuture = client.admin().indices().exists(new IndicesExistsRequest(index));
return existsFuture.actionGet().exists();
}

public boolean createIndex() {
final ActionFuture<CreateIndexResponse> createFuture = client.admin().indices().create(new CreateIndexRequest(getMainIndexName()));
public boolean createIndex(String indexName) {
final ActionFuture<CreateIndexResponse> createFuture = client.admin().indices().create(new CreateIndexRequest(indexName));
final boolean acknowledged = createFuture.actionGet().acknowledged();
if (!acknowledged) {
return false;
}
final PutMappingRequest mappingRequest = Mapping.getPutMappingRequest(client, getMainIndexName());
final PutMappingRequest mappingRequest = Mapping.getPutMappingRequest(client, indexName);
final boolean mappingCreated = client.admin().indices().putMapping(mappingRequest).actionGet().acknowledged();
return acknowledged && mappingCreated;
}
Expand All @@ -117,6 +132,30 @@ public boolean createRecentIndex() {
final boolean mappingCreated = client.admin().indices().putMapping(mappingRequest).actionGet().acknowledged();
return acknowledged && mappingCreated;
}

public boolean cycleAlias(String aliasName, String targetIndex) {
return client.admin().indices().prepareAliases()
.addAlias(targetIndex, aliasName)
.execute().actionGet().acknowledged();
}

public boolean cycleAlias(String aliasName, String targetIndex, String oldIndex) {
return client.admin().indices().prepareAliases()
.removeAlias(oldIndex, aliasName)
.addAlias(targetIndex, aliasName)
.execute().actionGet().acknowledged();
}

public long numberOfMessages(String indexName) throws IndexNotFoundException {
Map<String, IndexStats> indices = getIndices();
IndexStats index = indices.get(indexName);

if (index == null) {
throw new IndexNotFoundException();
}

return index.getTotal().docs().count();
}

public boolean bulkIndex(final List<LogMessage> messages) {
if (messages.isEmpty()) {
Expand All @@ -130,7 +169,7 @@ public boolean bulkIndex(final List<LogMessage> messages) {
// We manually set the same ID to allow linking between indices later.
final String id = UUID.randomBase64UUID();

b.add(buildIndexRequest(getMainIndexName(), source, id, 0)); // Main index.
b.add(buildIndexRequest(Deflector.DEFLECTOR_NAME, source, id, 0)); // Main index.
b.add(buildIndexRequest(RECENT_INDEX_NAME, source, id, server.getConfiguration().getRecentIndexTtlMinutes())); // Recent index.
}

Expand All @@ -144,16 +183,12 @@ public boolean bulkIndex(final List<LogMessage> messages) {
}

public boolean deleteMessagesByTimeRange(int to) {
DeleteByQueryRequestBuilder b = client.prepareDeleteByQuery(new String[] {getMainIndexName()});
DeleteByQueryRequestBuilder b = client.prepareDeleteByQuery(new String[] {allIndicesAlias()});
b.setTypes(new String[] {TYPE});
final QueryBuilder qb = rangeQuery("created_at").from(0).to(to);
b.setQuery(qb);
ActionFuture<DeleteByQueryResponse> future = client.deleteByQuery(b.request());
return future.actionGet().index(getMainIndexName()).failedShards() == 0;
}

public String getMainIndexName() {
return server.getConfiguration().getElasticSearchIndexName();
return future.actionGet().index(allIndicesAlias()).failedShards() == 0;
}

// yyyy-MM-dd HH-mm-ss
Expand Down

0 comments on commit 13d04dd

Please sign in to comment.