Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Use elasticsearch API #65

Closed
wants to merge 3 commits into from

4 participants

@joschi
Owner

graylog2-server currently uses the elasticsearch API over HTTP to check for and create indices, to index new messages, and to remove old messages.

There is also a native Java API which could be used in graylog2-server and which could bring some performance advantages over using the API over HTTP regarding bulk message indexing. But I haven't done any performance and throughput benchmarks yet.

Since the Java API depends on the full elasticsearch JAR this could also be used to implement an embedded ES service so that the user wouldn't have to install an external elasticsearch server and cope with its administration. For smaller installations (or for testing purposes) this would save some setup time.

This changeset basically pulls an interface out of the previous Indexer class and refactors the old implementation to an implementation of the interface. It shouldn't change the behaviour of graylog2-server (except for the index creation which moved from the Main class to the Indexer implementation).

joschi added some commits
@joschi joschi Added generics to Mapping class for elasticsearch index creation 2db0fae
@joschi joschi Corrected access modifiers in Mapping class and added a class constan…
…t for the index/message type in elasticsearch
a81796f
@joschi joschi Refactored Indexer class
* Indexer is now an interface
* The previous implementation is now called ElasticSearchHttpIndexer
* Got rid of the static methods in ElasticSearchHttpIndexer in order to be able to use HTTP keep-alive when indexing messages
* Added elasticsearch to POM
* Implemented ElasticSearchIndexer using the Java API of elasticsearch instead of the RESTful HTTP interface
6fa8af0
@dennisoelkers
@lennartkoopmann

Yes, great work! I'll merge this as soon as 0.9.6 is in master (i.e. after the release) so we have time to test it for 0.9.7!

@jbarciauskas

Hrm, this is going to conflict with my work on creating per-facility type mappings. Let me pull this down and see if I can get them both working...

@jbarciauskas

See pull request #66 for this change + some enhancements, including setting the elasticsearch special value _type to the graylog2 facility

@joschi joschi closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 18, 2011
  1. @joschi
  2. @joschi

    Corrected access modifiers in Mapping class and added a class constan…

    joschi authored
    …t for the index/message type in elasticsearch
  3. @joschi

    Refactored Indexer class

    joschi authored
    * Indexer is now an interface
    * The previous implementation is now called ElasticSearchHttpIndexer
    * Got rid of the static methods in ElasticSearchHttpIndexer in order to be able to use HTTP keep-alive when indexing messages
    * Added elasticsearch to POM
    * Implemented ElasticSearchIndexer using the Java API of elasticsearch instead of the RESTful HTTP interface
This page is out of date. Refresh to see the latest.
View
19 pom.xml
@@ -82,6 +82,11 @@
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>0.18.5</version>
+ </dependency>
</dependencies>
<build>
@@ -148,6 +153,20 @@
</build>
<name>graylog2-server</name>
<description>http://www.graylog2.org/</description>
+
+ <repositories>
+ <repository>
+ <id>sonatype.oss.releases</id>
+ <name>Sonatype OSS Releases Repository</name>
+ <url>http://oss.sonatype.org/content/repositories/releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
</project>
View
41 src/main/java/org/graylog2/Main.java
@@ -30,6 +30,7 @@
import org.apache.log4j.Logger;
import org.graylog2.database.MongoConnection;
import org.graylog2.forwarders.forwarders.LogglyForwarder;
+import org.graylog2.indexer.ElasticSearchHttpIndexer;
import org.graylog2.indexer.Indexer;
import org.graylog2.messagehandlers.amqp.AMQPBroker;
import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
@@ -39,15 +40,9 @@
import org.graylog2.messagehandlers.syslog.SyslogServerThread;
import org.graylog2.messagequeue.MessageQueue;
import org.graylog2.messagequeue.MessageQueueFlusher;
-import org.graylog2.periodical.BulkIndexerThread;
-import org.graylog2.periodical.ChunkedGELFClientManagerThread;
-import org.graylog2.periodical.HostCounterCacheWriterThread;
-import org.graylog2.periodical.MessageCountWriterThread;
-import org.graylog2.periodical.MessageRetentionThread;
-import org.graylog2.periodical.ServerValueWriterThread;
+import org.graylog2.periodical.*;
import java.io.FileWriter;
-import java.io.IOException;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.util.List;
@@ -127,20 +122,12 @@ public static void main(String[] args) {
// XXX ELASTIC: put in own method
// Check if the index exists. Create it if not.
+ Indexer indexer = null;
+
try {
- if (Indexer.indexExists()) {
- LOG.info("Index exists. Not creating it.");
- } else {
- LOG.info("Index does not exist! Trying to create it ...");
- if (Indexer.createIndex()) {
- LOG.info("Successfully created index.");
- } else {
- LOG.fatal("Could not create Index. Terminating.");
- System.exit(1);
- }
- }
- } catch (IOException e) {
- LOG.fatal("IOException while trying to check Index. Make sure that your ElasticSearch server is running.", e);
+ indexer = new ElasticSearchHttpIndexer(configuration.getElasticSearchIndexName(), "message");
+ } catch (Exception e) {
+ LOG.fatal("Error while connecting to elasticsearch server. Make sure that your elasticsearch server is running.", e);
System.exit(1);
}
@@ -161,7 +148,7 @@ public static void main(String[] args) {
initializeMessageCounters(scheduler);
// Inizialize message queue.
- initializeMessageQueue(scheduler, configuration);
+ initializeMessageQueue(scheduler, configuration, indexer);
// Write initial ServerValue information.
writeInitialServerValues(configuration);
@@ -181,13 +168,13 @@ public static void main(String[] args) {
// Start thread that automatically removes messages older than retention time.
if (commandLineArguments.performRetention()) {
- initializeMessageRetentionThread(scheduler);
+ initializeMessageRetentionThread(scheduler, indexer);
} else {
LOG.info("Not initializing retention time cleanup thread because --no-retention was passed.");
}
// Add a shutdown hook that tries to flush the message queue.
- Runtime.getRuntime().addShutdownHook(new MessageQueueFlusher());
+ Runtime.getRuntime().addShutdownHook(new MessageQueueFlusher(indexer));
LOG.info("Graylog2 up and running.");
}
@@ -199,13 +186,13 @@ private static void initializeHostCounterCache(ScheduledExecutorService schedule
LOG.info("Host count cache is up.");
}
- private static void initializeMessageQueue(ScheduledExecutorService scheduler, Configuration configuration) {
+ private static void initializeMessageQueue(ScheduledExecutorService scheduler, Configuration configuration, Indexer indexer) {
// Set the maximum size if it was configured to something else than 0 (= UNLIMITED)
if (configuration.getMessageQueueMaximumSize() != MessageQueue.SIZE_LIMIT_UNLIMITED) {
MessageQueue.getInstance().setMaximumSize(configuration.getMessageQueueMaximumSize());
}
- scheduler.scheduleAtFixedRate(new BulkIndexerThread(configuration), BulkIndexerThread.INITIAL_DELAY, configuration.getMessageQueuePollFrequency(), TimeUnit.SECONDS);
+ scheduler.scheduleAtFixedRate(new BulkIndexerThread(configuration, indexer), BulkIndexerThread.INITIAL_DELAY, configuration.getMessageQueuePollFrequency(), TimeUnit.SECONDS);
LOG.info("Message queue initialized .");
}
@@ -224,9 +211,9 @@ private static void initializeServerValueWriter(ScheduledExecutorService schedul
LOG.info("Server value writer up.");
}
- private static void initializeMessageRetentionThread(ScheduledExecutorService scheduler) {
+ private static void initializeMessageRetentionThread(ScheduledExecutorService scheduler, Indexer indexer) {
// Schedule first run. This is NOT at fixed rate. Thread will itself schedule next run with current frequency setting from database.
- scheduler.schedule(new MessageRetentionThread(),0,TimeUnit.SECONDS);
+ scheduler.schedule(new MessageRetentionThread(indexer), 0, TimeUnit.SECONDS);
LOG.info("Retention time management active.");
}
View
243 src/main/java/org/graylog2/indexer/ElasticSearchHttpIndexer.java
@@ -0,0 +1,243 @@
+/**
+ * Copyright 2011 Lennart Koopmann <lennart@socketfeed.com>
+ *
+ * This file is part of Graylog2.
+ *
+ * Graylog2 is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Graylog2 is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+package org.graylog2.indexer;
+
+import org.apache.log4j.Logger;
+import org.graylog2.Main;
+import org.graylog2.messagehandlers.gelf.GELFMessage;
+import org.json.simple.JSONValue;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * ElasticSearchHttpIndexer.java: Sep 05, 2011 9:13:03 PM
+ * <p/>
+ * Stores/indexes log messages in ElasticSearch.
+ *
+ * @author Lennart Koopmann <lennart@socketfeed.com>
+ */
+public class ElasticSearchHttpIndexer implements Indexer {
+
+ // XXX ELASTIC: refactor.
+
+ private static final Logger LOG = Logger.getLogger(ElasticSearchHttpIndexer.class);
+
+ private String indexName;
+ private String indexType;
+
+ private HttpURLConnection connection = null;
+
+ public ElasticSearchHttpIndexer(String indexName, String indexType) throws Exception {
+ this.indexName = indexName;
+ this.indexType = indexType;
+
+ if(!indexExists()) {
+ createIndex();
+ }
+ }
+
+ /**
+ * Checks if the index for Graylog2 exists
+ * <p/>
+ * See <a href="http://www.elasticsearch.org/guide/reference/api/admin-indices-indices-exists.html">elasticsearch Indices Exists API</a> for details.
+ *
+ * @return {@literal true} if the index for Graylog2 exists, {@literal false} otherwise
+ * @throws IOException if elasticsearch server couldn't be reached
+ */
+ private boolean indexExists() throws IOException {
+ URL url = new URL(buildIndexURL());
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("HEAD");
+ // Older versions of ElasticSearch return 400 Bad Request in cse of an existing index.
+ if (conn.getResponseCode() == HttpURLConnection.HTTP_OK || conn.getResponseCode() == HttpURLConnection.HTTP_BAD_REQUEST) {
+ return true;
+ } else {
+ if (conn.getResponseCode() != HttpURLConnection.HTTP_NOT_FOUND) {
+ LOG.warn("ElasticSearchHttpIndexer response code was not (200 or 400) or 404, but " + conn.getResponseCode());
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * Creates the index for Graylog2 including the mapping
+ * <p/>
+ * <a href="http://www.elasticsearch.org/guide/reference/api/admin-indices-create-index.html">Create Index API</a> and
+ * <a href="http://www.elasticsearch.org/guide/reference/mapping">elasticsearch Mapping</a>
+ *
+ * @return {@literal true} if the index for Graylog2 could be created, {@literal false} otherwise
+ * @throws IOException if elasticsearch server couldn't be reached
+ */
+ private boolean createIndex() throws IOException {
+
+ Writer writer = null;
+ URL url = new URL(buildIndexURL());
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setDoOutput(true);
+ conn.setRequestMethod("POST");
+
+ try {
+ writer = new OutputStreamWriter(conn.getOutputStream());
+
+ // Write Mapping.
+ writer.write(JSONValue.toJSONString(Mapping.get()));
+ writer.flush();
+ if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
+ return true;
+ } else {
+ LOG.warn("Response code of create index operation was not 200, but " + conn.getResponseCode());
+ return false;
+ }
+ } finally {
+ if (null != writer) {
+ writer.close();
+ }
+ }
+ }
+
+ /**
+ * Bulk-indexes/persists messages to ElasticSearch.
+ * <p/>
+ * See <a href="http://www.elasticsearch.org/guide/reference/api/bulk.html">elasticsearch Bulk API</a> for details
+ *
+ * @param messages The messages to index
+ * @return {@literal true} if the messages were successfully indexed, {@literal false} otherwise
+ */
+ public boolean bulkIndex(List<GELFMessage> messages) {
+
+ if (messages.isEmpty()) {
+ return true;
+ }
+
+ Writer writer = null;
+ int responseCode = 0;
+
+ try {
+
+ if(connection == null) {
+ connection = openConnection();
+ }
+
+ connection.connect();
+
+ writer = new OutputStreamWriter(connection.getOutputStream());
+ writer.write(getJSONfromGELFMessages(messages));
+ writer.flush();
+
+ responseCode = connection.getResponseCode();
+ } catch (IOException e) {
+ LOG.warn("IO error when trying to index messages", e);
+ } finally {
+ if (null != writer) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.error("Couldn't close output stream", e);
+ }
+ }
+ }
+
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ return true;
+ } else {
+ LOG.warn("ElasticSearchHttpIndexer response code was not 200, but " + responseCode);
+ return false;
+ }
+ }
+
+ private HttpURLConnection openConnection() throws IOException {
+
+ URL url = new URL(buildElasticSearchURL() + "_bulk");
+
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(true);
+ connection.setRequestMethod("POST");
+
+ return connection;
+ }
+
+ /**
+ * Deletes all messages from index which are older than the specified timestamp.
+ *
+ * @param to UTC UNIX timestamp
+ * @return {@literal true} if the messages were successfully deleted, {@literal false} otherwise
+ */
+ public boolean deleteMessagesByTimeRange(int to) {
+ int responseCode = 0;
+
+ try {
+ URL url = new URL(buildIndexWithTypeUrl() + buildDeleteByQuerySinceDate(to));
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("DELETE");
+ conn.connect();
+
+ responseCode = conn.getResponseCode();
+ } catch (IOException e) {
+ LOG.warn("IO error when trying to delete messages older than date", e);
+ }
+
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ return true;
+ } else {
+ LOG.warn("ElasticSearchHttpIndexer response code was not 200, but " + responseCode);
+ return false;
+ }
+ }
+
+ private static String buildDeleteByQuerySinceDate(int to) {
+ return "/_query?q=created_at%3A%5B0%20TO%20" + to + "%5D";
+ }
+
+ private String getJSONfromGELFMessages(List<GELFMessage> messages) {
+ StringBuilder sb = new StringBuilder();
+
+ for (GELFMessage message : messages) {
+ sb.append("{\"index\":{\"_index\":\"");
+ sb.append(indexName);
+ sb.append("\",\"_type\":\"");
+ sb.append(indexType);
+ sb.append("\"}}\n");
+ sb.append(JSONValue.toJSONString(message.toElasticSearchObject()));
+ sb.append("\n");
+ }
+
+ return sb.toString();
+ }
+
+ private String buildElasticSearchURL() {
+ return Main.configuration.getElasticSearchUrl();
+ }
+
+ private String buildIndexURL() {
+ return buildElasticSearchURL() + indexName;
+ }
+
+ private String buildIndexWithTypeUrl() {
+ return buildIndexURL() + "/" + indexType;
+ }
+
+}
View
196 src/main/java/org/graylog2/indexer/ElasticSearchIndexer.java
@@ -0,0 +1,196 @@
+package org.graylog2.indexer;
+
+import org.apache.log4j.Logger;
+import org.bson.types.ObjectId;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.exists.IndicesExistsResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
+import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.Requests;
+import org.elasticsearch.client.action.admin.indices.create.CreateIndexRequestBuilder;
+import org.elasticsearch.client.action.admin.indices.exists.IndicesExistsRequestBuilder;
+import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.graylog2.Tools;
+import org.graylog2.messagehandlers.gelf.GELFMessage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+public class ElasticSearchIndexer implements Indexer {
+
+ private static final Logger LOG = Logger.getLogger(ElasticSearchIndexer.class);
+
+ private Client elasticSearchClient;
+ private String indexName;
+ private String indexType;
+
+ public ElasticSearchIndexer(Client elasticSearchClient, String indexName, String indexType) throws Exception {
+ this.elasticSearchClient = elasticSearchClient;
+ this.indexName = indexName;
+ this.indexType = indexType;
+
+ if(!indexExists()) {
+ createIndex();
+ }
+ }
+
+ /**
+ * Checks if the index for Graylog2 exists
+ * <p/>
+ * See <a href="http://www.elasticsearch.org/guide/reference/api/admin-indices-indices-exists.html">elasticsearch Indices Exists API</a> for details.
+ *
+ * @return {@literal true} if the index for Graylog2 exists, {@literal false} otherwise
+ */
+ private boolean indexExists() {
+
+ IndicesExistsRequestBuilder builder = elasticSearchClient.admin().indices().prepareExists(indexName);
+
+ ListenableActionFuture<IndicesExistsResponse> response = builder.execute();
+
+ return response.actionGet().exists();
+ }
+
+ /**
+ * Creates the index for Graylog2 including the mapping
+ * <p/>
+ * <a href="http://www.elasticsearch.org/guide/reference/api/admin-indices-create-index.html">Create Index API</a> and
+ * <a href="http://www.elasticsearch.org/guide/reference/mapping">elasticsearch Mapping</a>
+ *
+ * @return {@literal true} if the index for Graylog2 could be created, {@literal false} otherwise
+ * @throws IOException if elasticsearch server couldn't be reached
+ */
+ private boolean createIndex() throws IOException {
+
+ CreateIndexRequestBuilder builder = elasticSearchClient.admin().indices().prepareCreate(indexName);
+
+ ListenableActionFuture<CreateIndexResponse> response = builder.addMapping(indexType, Mapping.get()).execute();
+
+ return response.actionGet().acknowledged();
+ }
+
+ /**
+ * Bulk-indexes/persists messages to ElasticSearch.
+ * <p/>
+ * See <a href="http://www.elasticsearch.org/guide/reference/api/bulk.html">elasticsearch Bulk API</a> for details
+ *
+ * @param messages The messages to index
+ * @return {@literal true} if the messages were successfully indexed, {@literal false} otherwise
+ */
+ @Override
+ public boolean bulkIndex(List<GELFMessage> messages) {
+
+ if (messages.isEmpty()) {
+ return true;
+ }
+
+ BulkRequestBuilder bulkRequest = elasticSearchClient.prepareBulk();
+
+ for(GELFMessage gelfMessage : messages) {
+
+ double createdAt = gelfMessage.getCreatedAt();
+
+ if (createdAt <= 0.0d) {
+ createdAt = Tools.getUTCTimestampWithMilliseconds();
+ }
+
+ // Manually converting stream ID to string - caused strange problems without it.
+ List<String> streamIds = new ArrayList<String>();
+ for (ObjectId id : gelfMessage.getStreamIds()) {
+ streamIds.add(id.toString());
+ }
+
+ try {
+ XContentBuilder contentBuilder = jsonBuilder()
+ .startObject()
+ .field("message", gelfMessage.getShortMessage())
+ .field("full_message", gelfMessage.getFullMessage())
+ .field("file", gelfMessage.getFile())
+ .field("line", gelfMessage.getLine())
+ .field("host", gelfMessage.getHost())
+ .field("facility", gelfMessage.getFacility())
+ .field("level", gelfMessage.getLevel())
+ .field("created_at", createdAt)
+ .field("streams", streamIds);
+
+ for(Map.Entry<String, Object> entry : gelfMessage.getAdditionalData().entrySet()) {
+
+ contentBuilder.field(entry.getKey(), entry.getValue());
+ }
+
+ contentBuilder.endObject();
+
+ bulkRequest.add(elasticSearchClient.prepareIndex(indexName, indexType).setSource(contentBuilder));
+ } catch (IOException e) {
+ LOG.warn("IO error when trying to index messages", e);
+ }
+ }
+
+ ListenableActionFuture<BulkResponse> futureResponse = bulkRequest.execute();
+ BulkResponse bulkResponse = futureResponse.actionGet();
+
+ if(futureResponse.isCancelled()) {
+ return false;
+ } else {
+ if (bulkResponse.hasFailures()) {
+
+ LOG.warn(String.format("%d errors while bulk indexing messages", bulkResponse.items().length));
+
+ if(LOG.isDebugEnabled()) {
+ for(BulkItemResponse responseItem : bulkResponse.items()) {
+
+ LOG.debug("Failure indexing message: " + responseItem.failureMessage());
+ }
+ }
+ }
+
+ return true;
+ }
+ }
+
+ /**
+ * Deletes all messages from index which are older than the specified timestamp.
+ *
+ * @param to UTC UNIX timestamp
+ * @return {@literal true} if the messages were successfully deleted, {@literal false} otherwise
+ */
+ @Override
+ public boolean deleteMessagesByTimeRange(int to) {
+ DeleteByQueryRequest request = Requests.deleteByQueryRequest(indexName).query(
+ QueryBuilders.rangeQuery("created_at").from(0).to(to));
+
+ ActionFuture<DeleteByQueryResponse> futureResponse = elasticSearchClient.deleteByQuery(request);
+
+ DeleteByQueryResponse response = futureResponse.actionGet();
+
+ if(futureResponse.isCancelled()) {
+ return false;
+ } else {
+
+ if (LOG.isDebugEnabled()) {
+ IndexDeleteByQueryResponse indexDeleteByQueryResponse = response.index(indexName);
+
+ LOG.debug(
+ String.format("Finished message removal; successful/failed/total shards: %d/%d/%d",
+ indexDeleteByQueryResponse.successfulShards(),
+ indexDeleteByQueryResponse.failedShards(),
+ indexDeleteByQueryResponse.totalShards()
+ )
+ );
+ }
+
+ return true;
+ }
+ }
+}
View
212 src/main/java/org/graylog2/indexer/Indexer.java
@@ -1,219 +1,13 @@
-/**
- * Copyright 2011 Lennart Koopmann <lennart@socketfeed.com>
- *
- * This file is part of Graylog2.
- *
- * Graylog2 is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * Graylog2 is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
- *
- */
-
package org.graylog2.indexer;
-import org.apache.log4j.Logger;
-import org.graylog2.Main;
import org.graylog2.messagehandlers.gelf.GELFMessage;
-import org.json.simple.JSONValue;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.net.HttpURLConnection;
-import java.net.URL;
import java.util.List;
/**
- * Indexer.java: Sep 05, 2011 9:13:03 PM
- * <p/>
- * Stores/indexes log messages in ElasticSearch.
- *
- * @author Lennart Koopmann <lennart@socketfeed.com>
*/
-public class Indexer {
-
- // XXX ELASTIC: refactor.
-
- private static final Logger LOG = Logger.getLogger(Indexer.class);
-
- public static final String INDEX = Main.configuration.getElasticSearchIndexName();
- public static final String TYPE = "message";
-
- /**
- * Checks if the index for Graylog2 exists
- * <p/>
- * See <a href="http://www.elasticsearch.org/guide/reference/api/admin-indices-indices-exists.html">elasticsearch Indices Exists API</a> for details.
- *
- * @return {@literal true} if the index for Graylog2 exists, {@literal false} otherwise
- * @throws IOException if elasticsearch server couldn't be reached
- */
- public static boolean indexExists() throws IOException {
- URL url = new URL(Indexer.buildIndexURL());
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setRequestMethod("HEAD");
- // Older versions of ElasticSearch return 400 Bad Request in cse of an existing index.
- if (conn.getResponseCode() == HttpURLConnection.HTTP_OK || conn.getResponseCode() == HttpURLConnection.HTTP_BAD_REQUEST) {
- return true;
- } else {
- if (conn.getResponseCode() != HttpURLConnection.HTTP_NOT_FOUND) {
- LOG.warn("Indexer response code was not (200 or 400) or 404, but " + conn.getResponseCode());
- }
-
- return false;
- }
- }
-
- /**
- * Creates the index for Graylog2 including the mapping
- * <p/>
- * <a href="http://www.elasticsearch.org/guide/reference/api/admin-indices-create-index.html">Create Index API</a> and
- * <a href="http://www.elasticsearch.org/guide/reference/mapping">elasticsearch Mapping</a>
- *
- * @return {@literal true} if the index for Graylog2 could be created, {@literal false} otherwise
- * @throws IOException if elasticsearch server couldn't be reached
- */
- public static boolean createIndex() throws IOException {
-
- Writer writer = null;
- URL url = new URL(Indexer.buildIndexURL());
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setDoOutput(true);
- conn.setRequestMethod("POST");
-
- try {
- writer = new OutputStreamWriter(conn.getOutputStream());
-
- // Write Mapping.
- writer.write(JSONValue.toJSONString(Mapping.get()));
- writer.flush();
- if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
- return true;
- } else {
- LOG.warn("Response code of create index operation was not 200, but " + conn.getResponseCode());
- return false;
- }
- } finally {
- if (null != writer) {
- writer.close();
- }
- }
- }
-
- /**
- * Bulk-indexes/persists messages to ElasticSearch.
- * <p/>
- * See <a href="http://www.elasticsearch.org/guide/reference/api/bulk.html">elasticsearch Bulk API</a> for details
- *
- * @param messages The messages to index
- * @return {@literal true} if the messages were successfully indexed, {@literal false} otherwise
- */
- public static boolean bulkIndex(List<GELFMessage> messages) {
-
- if (messages.isEmpty()) {
- return true;
- }
-
- Writer writer = null;
- int responseCode = 0;
-
- try {
- URL url = new URL(buildElasticSearchURL() + "_bulk");
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setDoOutput(true);
- conn.setRequestMethod("POST");
-
- writer = new OutputStreamWriter(conn.getOutputStream());
- writer.write(getJSONfromGELFMessages(messages));
- writer.flush();
-
- responseCode = conn.getResponseCode();
- } catch (IOException e) {
- LOG.warn("IO error when trying to index messages", e);
- } finally {
- if (null != writer) {
- try {
- writer.close();
- } catch (IOException e) {
- LOG.error("Couldn't close output stream", e);
- }
- }
- }
-
- if (responseCode == HttpURLConnection.HTTP_OK) {
- return true;
- } else {
- LOG.warn("Indexer response code was not 200, but " + responseCode);
- return false;
- }
- }
-
- /**
- * Deletes all messages from index which are older than the specified timestamp.
- *
- * @param to UTC UNIX timestamp
- * @return {@literal true} if the messages were successfully deleted, {@literal false} otherwise
- */
- public static boolean deleteMessagesByTimeRange(int to) {
- int responseCode = 0;
-
- try {
- URL url = new URL(buildIndexWithTypeUrl() + buildDeleteByQuerySinceDate(to));
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setRequestMethod("DELETE");
- conn.connect();
-
- responseCode = conn.getResponseCode();
- } catch (IOException e) {
- LOG.warn("IO error when trying to delete messages older than date", e);
- }
-
- if (responseCode == HttpURLConnection.HTTP_OK) {
- return true;
- } else {
- LOG.warn("Indexer response code was not 200, but " + responseCode);
- return false;
- }
- }
-
- private static String buildDeleteByQuerySinceDate(int to) {
- return "/_query?q=created_at%3A%5B0%20TO%20" + to + "%5D";
- }
-
- private static String getJSONfromGELFMessages(List<GELFMessage> messages) {
- StringBuilder sb = new StringBuilder();
-
- for (GELFMessage message : messages) {
- sb.append("{\"index\":{\"_index\":\"");
- sb.append(INDEX);
- sb.append("\",\"_type\":\"");
- sb.append(TYPE);
- sb.append("\"}}\n");
- sb.append(JSONValue.toJSONString(message.toElasticSearchObject()));
- sb.append("\n");
- }
-
- return sb.toString();
- }
-
- private static String buildElasticSearchURL() {
- return Main.configuration.getElasticSearchUrl();
- }
-
- private static String buildIndexURL() {
- return buildElasticSearchURL() + Indexer.INDEX;
- }
-
- private static String buildIndexWithTypeUrl() {
- return buildIndexURL() + "/" + Indexer.TYPE;
- }
+public interface Indexer {
+ boolean bulkIndex(List<GELFMessage> messages);
+ boolean deleteMessagesByTimeRange(int to);
}
View
34 src/main/java/org/graylog2/indexer/Mapping.java
@@ -34,16 +34,18 @@
* @author Lennart Koopmann <lennart@socketfeed.com>
*/
public class Mapping {
+
+ private static String TYPE = "message";
- public static Map get() {
- Map mapping = new HashMap();
+ public static Map<String, Object> get() {
+ Map<String, Object> mapping = new HashMap<String, Object>();
mapping.put("properties", partFieldProperties());
mapping.put("dynamic_templates", partDefaultAllInDynamicTemplate());
- Map completeMapping = new HashMap();
- completeMapping.put(Indexer.TYPE, mapping);
+ Map<String, Map> completeMapping = new HashMap<String, Map>();
+ completeMapping.put(TYPE, mapping);
- Map spec = new HashMap();
+ Map<String, Object> spec = new HashMap<String, Object>();
spec.put("mappings", completeMapping);
return spec;
@@ -52,11 +54,11 @@ public static Map get() {
/*
* Disable analyzing for every field by default.
*/
- public static List partDefaultAllInDynamicTemplate() {
- List dynamicTemplates = new LinkedList();
- Map template = new HashMap();
- Map defaultAll = new HashMap();
- Map notAnalyzed = new HashMap();
+ private static List<Map> partDefaultAllInDynamicTemplate() {
+ List<Map> dynamicTemplates = new LinkedList<Map>();
+ Map<String, Map> template = new HashMap<String, Map>();
+ Map<String, Object> defaultAll = new HashMap<String, Object>();
+ Map<String, String> notAnalyzed = new HashMap<String, String>();
notAnalyzed.put("index", "not_analyzed");
// Match all.
@@ -73,8 +75,8 @@ public static List partDefaultAllInDynamicTemplate() {
/*
* Enable analyzing for some fields again. Like for message and full_message.
*/
- public static Map partFieldProperties() {
- Map properties = new HashMap();
+ private static Map partFieldProperties() {
+ Map<String, Map<String, String>> properties = new HashMap<String, Map<String, String>>();
properties.put("message", analyzedString());
properties.put("full_message", analyzedString());
@@ -85,16 +87,16 @@ public static Map partFieldProperties() {
return properties;
}
- public static Map analyzedString() {
- Map type = new HashMap();
+ private static Map<String, String> analyzedString() {
+ Map<String, String> type = new HashMap<String, String>();
type.put("index", "analyzed");
type.put("type", "string");
return type;
}
- public static Map typeNumberDouble() {
- Map type = new HashMap();
+ private static Map<String, String> typeNumberDouble() {
+ Map<String, String> type = new HashMap<String, String>();
type.put("type", "double");
return type;
View
4 src/main/java/org/graylog2/indexer/retention/MessageRetention.java
@@ -34,10 +34,10 @@
private static final Logger LOG = Logger.getLogger(MessageRetention.class);
- public static boolean performCleanup(int timeDays) {
+ public static boolean performCleanup(int timeDays, Indexer indexer) {
int to = Tools.getTimestampDaysAgo(Tools.getUTCTimestamp(), timeDays);
LOG.debug("Deleting all messages older than " + to + " (" + timeDays + " days ago)");
- return Indexer.deleteMessagesByTimeRange(to);
+ return indexer.deleteMessagesByTimeRange(to);
}
public static void updateLastPerformedTime() {
View
11 src/main/java/org/graylog2/messagequeue/MessageQueueFlusher.java
@@ -19,11 +19,12 @@
*/
package org.graylog2.messagequeue;
-import java.util.List;
import org.apache.log4j.Logger;
import org.graylog2.indexer.Indexer;
import org.graylog2.messagehandlers.gelf.GELFMessage;
+import java.util.List;
+
/**
* MessageQueueFlusher.java: Nov 17, 2011 7:02:40 PM
*
@@ -33,6 +34,12 @@
private static final Logger LOG = Logger.getLogger(MessageQueueFlusher.class);
+ private Indexer indexer;
+
+ public MessageQueueFlusher(Indexer indexer) {
+ this.indexer = indexer;
+ }
+
@Override
public void run() {
try {
@@ -43,7 +50,7 @@ public void run() {
List<GELFMessage> messages = MessageQueue.getInstance().readAll();
LOG.info("Flushing all " + messages.size() + " messages to indexer.");
- Indexer.bulkIndex(messages);
+ indexer.bulkIndex(messages);
} catch (Exception e) {
LOG.warn("Error while flushing messages from queue: " + e.getMessage(), e);
} finally {
View
10 src/main/java/org/graylog2/periodical/BulkIndexerThread.java
@@ -20,7 +20,6 @@
package org.graylog2.periodical;
-import java.util.List;
import org.apache.log4j.Logger;
import org.graylog2.Configuration;
import org.graylog2.ServerValue;
@@ -28,6 +27,8 @@
import org.graylog2.messagehandlers.gelf.GELFMessage;
import org.graylog2.messagequeue.MessageQueue;
+import java.util.List;
+
/**
* BulkIndexerThread.java: Nov 16, 2011 5:25:32 PM
* <p/>
@@ -44,9 +45,12 @@
private int batchSize;
private int pollFreq;
- public BulkIndexerThread(Configuration configuration) {
+ private Indexer indexer;
+
+ public BulkIndexerThread(Configuration configuration, Indexer indexer) {
this.batchSize = configuration.getMessageQueueBatchSize();
this.pollFreq = configuration.getMessageQueuePollFrequency();
+ this.indexer = indexer;
LOG.info("Initialized message queue bulk indexer with batch size <" + this.batchSize + "> and polling frequency <" + this.pollFreq + ">.");
}
@@ -60,7 +64,7 @@ public void run() {
List<GELFMessage> messages = mq.readBatch(batchSize);
LOG.info("... indexing " + messages.size() + " messages.");
- Indexer.bulkIndex(messages);
+ indexer.bulkIndex(messages);
/*
* Write message queue size information to server values. We do this
View
18 src/main/java/org/graylog2/periodical/MessageRetentionThread.java
@@ -20,12 +20,14 @@
package org.graylog2.periodical;
-import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.graylog2.Main;
+import org.graylog2.indexer.Indexer;
import org.graylog2.indexer.retention.MessageRetention;
import org.graylog2.settings.Setting;
+import java.util.concurrent.TimeUnit;
+
/**
* MessageRetentionThread.java: Nov 22, 2011 7:35:10 PM
* <p/>
@@ -36,8 +38,12 @@
public class MessageRetentionThread implements Runnable {
private static final Logger LOG = Logger.getLogger(MessageRetentionThread.class);
+
+ private Indexer indexer;
- private int retentionTimeDays;
+ public MessageRetentionThread(Indexer indexer) {
+ this.indexer = indexer;
+ }
@Override
public void run() {
@@ -47,11 +53,11 @@ public void run() {
* to be sure to always have the *current* setting from database - even when
* used with scheduler.
*/
- this.retentionTimeDays = Setting.getRetentionTimeInDays();
+ int retentionTimeDays = Setting.getRetentionTimeInDays();
- LOG.info("Initialized message retention thread - cleaning all messages older than <" + this.retentionTimeDays + " days>.");
+ LOG.info("Initialized message retention thread - cleaning all messages older than <" + retentionTimeDays + " days>.");
- if (MessageRetention.performCleanup(this.retentionTimeDays)) {
+ if (MessageRetention.performCleanup(retentionTimeDays, indexer)) {
MessageRetention.updateLastPerformedTime();
}
} catch (Exception e) {
@@ -64,7 +70,7 @@ public void run() {
private void scheduleNextRun() {
// Schedule next run in [current frequency setting from database] minutes.
int when = Setting.getRetentionFrequencyInMinutes();
- Main.scheduler.schedule(new MessageRetentionThread(), when, TimeUnit.MINUTES);
+ Main.scheduler.schedule(new MessageRetentionThread(indexer), when, TimeUnit.MINUTES);
LOG.info("Scheduled next run of MessageRetentionThread in <" + when + " minutes>.");
}
}
Something went wrong with that request. Please try again.