From 3d096e5634328892625d526f73eadf37af575684 Mon Sep 17 00:00:00 2001 From: Ralph Goers Date: Mon, 4 Apr 2022 01:44:16 -0700 Subject: [PATCH] Fork the sink from main flume build --- .github/workflows/build.yml | 63 +++ .github/workflows/codeql-analysis.yml | 70 +++ .gitignore | 22 + flume-elasticsearch-sink/pom.xml | 96 ++++ ...asticSearchIndexRequestBuilderFactory.java | 124 +++++ .../elasticsearch/ContentBuilderUtil.java | 86 +++ .../ElasticSearchDynamicSerializer.java | 73 +++ .../ElasticSearchEventSerializer.java | 48 ++ ...asticSearchIndexRequestBuilderFactory.java | 60 +++ .../ElasticSearchLogStashEventSerializer.java | 145 +++++ .../sink/elasticsearch/ElasticSearchSink.java | 434 +++++++++++++++ .../ElasticSearchSinkConstants.java | 111 ++++ ...tSerializerIndexRequestBuilderFactory.java | 69 +++ .../sink/elasticsearch/IndexNameBuilder.java | 42 ++ .../elasticsearch/SimpleIndexNameBuilder.java | 46 ++ .../TimeBasedIndexNameBuilder.java | 92 ++++ .../sink/elasticsearch/TimestampedEvent.java | 60 +++ .../client/ElasticSearchClient.java | 58 ++ .../client/ElasticSearchClientFactory.java | 78 +++ .../client/ElasticSearchRestClient.java | 149 ++++++ .../client/ElasticSearchTransportClient.java | 228 ++++++++ .../client/NoSuchClientTypeException.java | 23 + .../elasticsearch/client/RoundRobinList.java | 44 ++ .../AbstractElasticSearchSinkTest.java | 164 ++++++ .../TestElasticSearchDynamicSerializer.java | 64 +++ ...asticSearchIndexRequestBuilderFactory.java | 215 ++++++++ ...tElasticSearchLogStashEventSerializer.java | 127 +++++ .../elasticsearch/TestElasticSearchSink.java | 505 ++++++++++++++++++ .../TestElasticSearchSinkCreation.java | 49 ++ .../TimeBasedIndexNameBuilderTest.java | 60 +++ .../elasticsearch/TimestampedEventTest.java | 88 +++ .../client/RoundRobinListTest.java | 42 ++ .../TestElasticSearchClientFactory.java | 64 +++ .../client/TestElasticSearchRestClient.java | 180 +++++++ .../TestElasticSearchTransportClient.java | 127 +++++ .../src/test/resources/log4j2.xml | 31 ++ pom.xml | 277 ++++++++++ 37 files changed, 4214 insertions(+) create mode 100644 .github/workflows/build.yml create mode 100644 .github/workflows/codeql-analysis.yml create mode 100644 .gitignore create mode 100644 flume-elasticsearch-sink/pom.xml create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java create mode 100644 flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java create mode 100644 flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java create mode 100644 flume-elasticsearch-sink/src/test/resources/log4j2.xml create mode 100644 pom.xml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..96130aa --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache license, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the license for the specific language governing permissions and +# limitations under the license. + +name: build + +on: + push: + branches: + - trunk + pull_request: + +jobs: + build: + + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ ubuntu-latest, macos-latest ] + + steps: + + - name: Checkout repository + uses: actions/checkout@v2 + + # JDK 8 is needed for the build, and it is the primary bytecode target. + - name: Setup JDK 8 + uses: actions/setup-java@v2.3.0 + with: + distribution: temurin + java-version: 8 + java-package: jdk + architecture: x64 + cache: maven + + - name: Inspect environment (Linux) + if: runner.os == 'Linux' + run: env | grep '^JAVA' + + - name: Inspect environment (MacOS) + if: runner.os == 'macOS' + run: env | grep '^JAVA' + + - name: Build with Maven + timeout-minutes: 120 + shell: bash + run: | + ./mvnw clean verify -DredirectTestOutput=true \ + --show-version --batch-mode --errors --no-transfer-progress \ + -DtrimStackTrace=false \ + -Dsurefire.rerunFailingTestsCount=2 diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..20d8be9 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,70 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ trunk ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ trunk ] + schedule: + - cron: '15 0 * * 5' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'java', 'python' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ] + # Learn more about CodeQL language support at https://git.io/codeql-language-support + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v1 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v1 + + # â„šī¸ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # âœī¸ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v1 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a0a932 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Lines that start with '#' are comments. +*~ +*.diff +*# +.classpath +.project +.settings +bin/flume-env.sh +conf/flume-site.xml +bin/.settings +.eclipse +pmd_report.html +*/bin +target +patchprocess +derby.log +.idea +*.iml +nb-configuration.xml +.DS_Store +/.mvn/wrapper/maven-wrapper.jar +**/metastore_db diff --git a/flume-elasticsearch-sink/pom.xml b/flume-elasticsearch-sink/pom.xml new file mode 100644 index 0000000..a5beb8e --- /dev/null +++ b/flume-elasticsearch-sink/pom.xml @@ -0,0 +1,96 @@ + + + + + 4.0.0 + + + flume-search + org.apache.flume + 1.0.0-SNAPSHOT + + + org.apache.flume + flume-elasticsearch-sink + Apache Flume ElasticSearch Sink + + + + 8 + 10 + + + + + + org.apache.flume + flume-ng-sdk + + + + org.apache.flume + flume-ng-core + + + + org.slf4j + slf4j-api + + + + org.elasticsearch + elasticsearch + true + + + + org.apache.httpcomponents + httpclient + + + + junit + junit + test + + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + + org.apache.logging.log4j + log4j-1.2-api + test + + + + commons-lang + commons-lang + + + + com.google.guava + guava + + + + org.mockito + mockito-all + test + + + + diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java new file mode 100644 index 0000000..754155c --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import java.io.IOException; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurableComponent; +import org.apache.flume.formatter.output.BucketPath; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Abstract base class for custom implementations of + * {@link ElasticSearchIndexRequestBuilderFactory}. + */ +public abstract class AbstractElasticSearchIndexRequestBuilderFactory + implements ElasticSearchIndexRequestBuilderFactory { + + /** + * {@link FastDateFormat} to use for index names + * in {@link #getIndexName(String, long)} + */ + protected final FastDateFormat fastDateFormat; + + /** + * Constructor for subclasses + * @param fastDateFormat {@link FastDateFormat} to use for index names + */ + protected AbstractElasticSearchIndexRequestBuilderFactory(FastDateFormat fastDateFormat) { + this.fastDateFormat = fastDateFormat; + } + + /** + * @see Configurable + */ + @Override + public abstract void configure(Context arg0); + + /** + * @see ConfigurableComponent + */ + @Override + public abstract void configure(ComponentConfiguration arg0); + + /** + * Creates and prepares an {@link IndexRequestBuilder} from the supplied + * {@link Client} via delegation to the subclass-hook template methods + * {@link #getIndexName(String, long)} and + * {@link #prepareIndexRequest(IndexRequestBuilder, String, String, Event)} + */ + @Override + public IndexRequestBuilder createIndexRequest(Client client, + String indexPrefix, String indexType, Event event) throws IOException { + IndexRequestBuilder request = prepareIndex(client); + String realIndexPrefix = BucketPath.escapeString(indexPrefix, event.getHeaders()); + String realIndexType = BucketPath.escapeString(indexType, event.getHeaders()); + + TimestampedEvent timestampedEvent = new TimestampedEvent(event); + long timestamp = timestampedEvent.getTimestamp(); + + String indexName = getIndexName(realIndexPrefix, timestamp); + prepareIndexRequest(request, indexName, realIndexType, timestampedEvent); + return request; + } + + @VisibleForTesting + IndexRequestBuilder prepareIndex(Client client) { + return client.prepareIndex(); + } + + /** + * Gets the name of the index to use for an index request + * @param indexPrefix + * Prefix of index name to use -- as configured on the sink + * @param timestamp + * timestamp (millis) to format / use + * @return index name of the form 'indexPrefix-formattedTimestamp' + */ + protected String getIndexName(String indexPrefix, long timestamp) { + return new StringBuilder(indexPrefix).append('-') + .append(fastDateFormat.format(timestamp)).toString(); + } + + /** + * Prepares an ElasticSearch {@link IndexRequestBuilder} instance + * @param indexRequest + * The (empty) ElasticSearch {@link IndexRequestBuilder} to prepare + * @param indexName + * Index name to use -- as per {@link #getIndexName(String, long)} + * @param indexType + * Index type to use -- as configured on the sink + * @param event + * Flume event to serialize and add to index request + * @throws IOException + * If an error occurs e.g. during serialization + */ + protected abstract void prepareIndexRequest( + IndexRequestBuilder indexRequest, String indexName, + String indexType, Event event) throws IOException; + +} \ No newline at end of file diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java new file mode 100644 index 0000000..4fda1b8 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ContentBuilderUtil.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.elasticsearch.common.jackson.core.JsonParseException; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; + +/** + * Utility methods for using ElasticSearch {@link XContentBuilder} + */ +public class ContentBuilderUtil { + + private static final Charset charset = Charset.defaultCharset(); + + private ContentBuilderUtil() { + } + + public static void appendField(XContentBuilder builder, String field, + byte[] data) throws IOException { + XContentType contentType = XContentFactory.xContentType(data); + if (contentType == null) { + addSimpleField(builder, field, data); + } else { + addComplexField(builder, field, contentType, data); + } + } + + public static void addSimpleField(XContentBuilder builder, String fieldName, + byte[] data) throws IOException { + builder.field(fieldName, new String(data, charset)); + } + + public static void addComplexField(XContentBuilder builder, String fieldName, + XContentType contentType, byte[] data) throws IOException { + XContentParser parser = null; + try { + // Elasticsearch will accept JSON directly but we need to validate that + // the incoming event is JSON first. Sadly, the elasticsearch JSON parser + // is a stream parser so we need to instantiate it, parse the event to + // validate it, then instantiate it again to provide the JSON to + // elasticsearch. + // If validation fails then the incoming event is submitted to + // elasticsearch as plain text. + parser = XContentFactory.xContent(contentType).createParser(data); + while (parser.nextToken() != null) {}; + + // If the JSON is valid then include it + parser = XContentFactory.xContent(contentType).createParser(data); + // Add the field name, but not the value. + builder.field(fieldName); + // This will add the whole parsed content as the value of the field. + builder.copyCurrentStructure(parser); + } catch (JsonParseException ex) { + // If we get an exception here the most likely cause is nested JSON that + // can't be figured out in the body. At this point just push it through + // as is + addSimpleField(builder, fieldName, data); + } finally { + if (parser != null) { + parser.close(); + } + } + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java new file mode 100644 index 0000000..aa7ad39 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchDynamicSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +import java.io.IOException; +import java.util.Map; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.elasticsearch.common.xcontent.XContentBuilder; + +/** + * Basic serializer that serializes the event body and header fields into + * individual fields

+ * + * A best effort will be used to determine the content-type, if it cannot be + * determined fields will be indexed as Strings + */ +public class ElasticSearchDynamicSerializer implements + ElasticSearchEventSerializer { + + @Override + public void configure(Context context) { + // NO-OP... + } + + @Override + public void configure(ComponentConfiguration conf) { + // NO-OP... + } + + @Override + public XContentBuilder getContentBuilder(Event event) throws IOException { + XContentBuilder builder = jsonBuilder().startObject(); + appendBody(builder, event); + appendHeaders(builder, event); + return builder; + } + + private void appendBody(XContentBuilder builder, Event event) + throws IOException { + ContentBuilderUtil.appendField(builder, "body", event.getBody()); + } + + private void appendHeaders(XContentBuilder builder, Event event) + throws IOException { + Map headers = event.getHeaders(); + for (String key : headers.keySet()) { + ContentBuilderUtil.appendField(builder, key, + headers.get(key).getBytes(charset)); + } + } + +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java new file mode 100644 index 0000000..c89d627 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurableComponent; +import org.elasticsearch.common.io.BytesStream; + +/** + * Interface for an event serializer which serializes the headers and body of an + * event to write them to ElasticSearch. This is configurable, so any config + * params required should be taken through this. + */ +public interface ElasticSearchEventSerializer extends Configurable, + ConfigurableComponent { + + public static final Charset charset = Charset.defaultCharset(); + + /** + * Return an {@link BytesStream} made up of the serialized flume event + * @param event + * The flume event to serialize + * @return A {@link BytesStream} used to write to ElasticSearch + * @throws IOException + * If an error occurs during serialization + */ + abstract BytesStream getContentBuilder(Event event) throws IOException; +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java new file mode 100644 index 0000000..f76308c --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurableComponent; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; + +import java.io.IOException; +import java.util.TimeZone; + +/** + * Interface for creating ElasticSearch {@link IndexRequestBuilder} instances + * from serialized flume events. This is configurable, so any config params + * required should be taken through this. + */ +public interface ElasticSearchIndexRequestBuilderFactory extends Configurable, + ConfigurableComponent { + + static final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd", + TimeZone.getTimeZone("Etc/UTC")); + + /** + * @param client + * ElasticSearch {@link Client} to prepare index from + * @param indexPrefix + * Prefix of index name to use -- as configured on the sink + * @param indexType + * Index type to use -- as configured on the sink + * @param event + * Flume event to serialize and add to index request + * @return prepared ElasticSearch {@link IndexRequestBuilder} instance + * @throws IOException + * If an error occurs e.g. during serialization + */ + IndexRequestBuilder createIndexRequest(Client client, String indexPrefix, + String indexType, Event event) throws IOException; + + + +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java new file mode 100644 index 0000000..3638368 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Date; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.xcontent.XContentBuilder; + +/** + * Serialize flume events into the same format LogStash uses

+ * + * This can be used to send events to ElasticSearch and use clients such as + * Kabana which expect Logstash formated indexes + * + *
+ * {
+ *    "@timestamp": "2010-12-21T21:48:33.309258Z",
+ *    "@tags": [ "array", "of", "tags" ],
+ *    "@type": "string",
+ *    "@source": "source of the event, usually a URL."
+ *    "@source_host": ""
+ *    "@source_path": ""
+ *    "@fields":{
+ *       # a set of fields for this event
+ *       "user": "jordan",
+ *       "command": "shutdown -r":
+ *     }
+ *     "@message": "the original plain-text message"
+ *   }
+ * 
+ * + * If the following headers are present, they will map to the above logstash + * output as long as the logstash fields are not already present.

+ * + *
+ *  timestamp: long -> @timestamp:Date
+ *  host: String -> @source_host: String
+ *  src_path: String -> @source_path: String
+ *  type: String -> @type: String
+ *  source: String -> @source: String
+ * 
+ * + * @see https + * ://github.com/logstash/logstash/wiki/logstash%27s-internal-message- + * format + */ +public class ElasticSearchLogStashEventSerializer implements + ElasticSearchEventSerializer { + + @Override + public XContentBuilder getContentBuilder(Event event) throws IOException { + XContentBuilder builder = jsonBuilder().startObject(); + appendBody(builder, event); + appendHeaders(builder, event); + return builder; + } + + private void appendBody(XContentBuilder builder, Event event) + throws IOException, UnsupportedEncodingException { + byte[] body = event.getBody(); + ContentBuilderUtil.appendField(builder, "@message", body); + } + + private void appendHeaders(XContentBuilder builder, Event event) + throws IOException { + Map headers = Maps.newHashMap(event.getHeaders()); + + String timestamp = headers.get("timestamp"); + if (!StringUtils.isBlank(timestamp) + && StringUtils.isBlank(headers.get("@timestamp"))) { + long timestampMs = Long.parseLong(timestamp); + builder.field("@timestamp", new Date(timestampMs)); + } + + String source = headers.get("source"); + if (!StringUtils.isBlank(source) + && StringUtils.isBlank(headers.get("@source"))) { + ContentBuilderUtil.appendField(builder, "@source", + source.getBytes(charset)); + } + + String type = headers.get("type"); + if (!StringUtils.isBlank(type) + && StringUtils.isBlank(headers.get("@type"))) { + ContentBuilderUtil.appendField(builder, "@type", type.getBytes(charset)); + } + + String host = headers.get("host"); + if (!StringUtils.isBlank(host) + && StringUtils.isBlank(headers.get("@source_host"))) { + ContentBuilderUtil.appendField(builder, "@source_host", + host.getBytes(charset)); + } + + String srcPath = headers.get("src_path"); + if (!StringUtils.isBlank(srcPath) + && StringUtils.isBlank(headers.get("@source_path"))) { + ContentBuilderUtil.appendField(builder, "@source_path", + srcPath.getBytes(charset)); + } + + builder.startObject("@fields"); + for (String key : headers.keySet()) { + byte[] val = headers.get(key).getBytes(charset); + ContentBuilderUtil.appendField(builder, key, val); + } + builder.endObject(); + } + + @Override + public void configure(Context context) { + // NO-OP... + } + + @Override + public void configure(ComponentConfiguration conf) { + // NO-OP... + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java new file mode 100644 index 0000000..05eb5ff --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLUSTER_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_TTL; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.CounterGroup; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.BatchSizeSupported; +import org.apache.flume.formatter.output.BucketPath; +import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.flume.sink.AbstractSink; +import org.apache.flume.sink.elasticsearch.client.ElasticSearchClient; +import org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_PREFIX; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLIENT_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_INDEX_NAME_BUILDER_CLASS; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_SERIALIZER_CLASS; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER_PREFIX; + +/** + * A sink which reads events from a channel and writes them to ElasticSearch + * based on the work done by https://github.com/Aconex/elasticflume.git.

+ * + * This sink supports batch reading of events from the channel and writing them + * to ElasticSearch.

+ * + * Indexes will be rolled daily using the format 'indexname-YYYY-MM-dd' to allow + * easier management of the index

+ * + * This sink must be configured with with mandatory parameters detailed in + * {@link ElasticSearchSinkConstants}

It is recommended as a secondary step + * the ElasticSearch indexes are optimized for the specified serializer. This is + * not handled by the sink but is typically done by deploying a config template + * alongside the ElasticSearch deploy

+ * + * @see http + * ://www.elasticsearch.org/guide/reference/api/admin-indices-templates. + * html + */ +public class ElasticSearchSink extends AbstractSink implements Configurable, BatchSizeSupported { + + private static final Logger logger = LoggerFactory + .getLogger(ElasticSearchSink.class); + + // Used for testing + private boolean isLocal = false; + private final CounterGroup counterGroup = new CounterGroup(); + + private static final int defaultBatchSize = 100; + + private int batchSize = defaultBatchSize; + private long ttlMs = DEFAULT_TTL; + private String clusterName = DEFAULT_CLUSTER_NAME; + private String indexName = DEFAULT_INDEX_NAME; + private String indexType = DEFAULT_INDEX_TYPE; + private String clientType = DEFAULT_CLIENT_TYPE; + private final Pattern pattern = Pattern.compile(TTL_REGEX, + Pattern.CASE_INSENSITIVE); + private Matcher matcher = pattern.matcher(""); + + private String[] serverAddresses = null; + + private ElasticSearchClient client = null; + private Context elasticSearchClientContext = null; + + private ElasticSearchIndexRequestBuilderFactory indexRequestFactory; + private ElasticSearchEventSerializer eventSerializer; + private IndexNameBuilder indexNameBuilder; + private SinkCounter sinkCounter; + + /** + * Create an {@link ElasticSearchSink} configured using the supplied + * configuration + */ + public ElasticSearchSink() { + this(false); + } + + /** + * Create an {@link ElasticSearchSink}

+ * + * @param isLocal + * If true sink will be configured to only talk to an + * ElasticSearch instance hosted in the same JVM, should always be + * false is production + * + */ + @VisibleForTesting + ElasticSearchSink(boolean isLocal) { + this.isLocal = isLocal; + } + + @VisibleForTesting + String[] getServerAddresses() { + return serverAddresses; + } + + @VisibleForTesting + String getClusterName() { + return clusterName; + } + + @VisibleForTesting + String getIndexName() { + return indexName; + } + + @VisibleForTesting + String getIndexType() { + return indexType; + } + + @VisibleForTesting + long getTTLMs() { + return ttlMs; + } + + @VisibleForTesting + ElasticSearchEventSerializer getEventSerializer() { + return eventSerializer; + } + + @VisibleForTesting + IndexNameBuilder getIndexNameBuilder() { + return indexNameBuilder; + } + + @Override + public long getBatchSize() { + return batchSize; + } + + @Override + public Status process() throws EventDeliveryException { + logger.debug("processing..."); + Status status = Status.READY; + Channel channel = getChannel(); + Transaction txn = channel.getTransaction(); + try { + txn.begin(); + int count; + for (count = 0; count < batchSize; ++count) { + Event event = channel.take(); + + if (event == null) { + break; + } + String realIndexType = BucketPath.escapeString(indexType, event.getHeaders()); + client.addEvent(event, indexNameBuilder, realIndexType, ttlMs); + } + + if (count <= 0) { + sinkCounter.incrementBatchEmptyCount(); + counterGroup.incrementAndGet("channel.underflow"); + status = Status.BACKOFF; + } else { + if (count < batchSize) { + sinkCounter.incrementBatchUnderflowCount(); + status = Status.BACKOFF; + } else { + sinkCounter.incrementBatchCompleteCount(); + } + + sinkCounter.addToEventDrainAttemptCount(count); + client.execute(); + } + txn.commit(); + sinkCounter.addToEventDrainSuccessCount(count); + counterGroup.incrementAndGet("transaction.success"); + } catch (Throwable ex) { + try { + txn.rollback(); + counterGroup.incrementAndGet("transaction.rollback"); + } catch (Exception ex2) { + logger.error( + "Exception in rollback. Rollback might not have been successful.", + ex2); + } + + if (ex instanceof Error || ex instanceof RuntimeException) { + logger.error("Failed to commit transaction. Transaction rolled back.", + ex); + Throwables.propagate(ex); + } else { + logger.error("Failed to commit transaction. Transaction rolled back.", + ex); + throw new EventDeliveryException( + "Failed to commit transaction. Transaction rolled back.", ex); + } + } finally { + txn.close(); + } + return status; + } + + @Override + public void configure(Context context) { + if (!isLocal) { + if (StringUtils.isNotBlank(context.getString(HOSTNAMES))) { + serverAddresses = StringUtils.deleteWhitespace( + context.getString(HOSTNAMES)).split(","); + } + Preconditions.checkState(serverAddresses != null + && serverAddresses.length > 0, "Missing Param:" + HOSTNAMES); + } + + if (StringUtils.isNotBlank(context.getString(INDEX_NAME))) { + this.indexName = context.getString(INDEX_NAME); + } + + if (StringUtils.isNotBlank(context.getString(INDEX_TYPE))) { + this.indexType = context.getString(INDEX_TYPE); + } + + if (StringUtils.isNotBlank(context.getString(CLUSTER_NAME))) { + this.clusterName = context.getString(CLUSTER_NAME); + } + + if (StringUtils.isNotBlank(context.getString(BATCH_SIZE))) { + this.batchSize = Integer.parseInt(context.getString(BATCH_SIZE)); + } + + if (StringUtils.isNotBlank(context.getString(TTL))) { + this.ttlMs = parseTTL(context.getString(TTL)); + Preconditions.checkState(ttlMs > 0, TTL + + " must be greater than 0 or not set."); + } + + if (StringUtils.isNotBlank(context.getString(CLIENT_TYPE))) { + clientType = context.getString(CLIENT_TYPE); + } + + elasticSearchClientContext = new Context(); + elasticSearchClientContext.putAll(context.getSubProperties(CLIENT_PREFIX)); + + String serializerClazz = DEFAULT_SERIALIZER_CLASS; + if (StringUtils.isNotBlank(context.getString(SERIALIZER))) { + serializerClazz = context.getString(SERIALIZER); + } + + Context serializerContext = new Context(); + serializerContext.putAll(context.getSubProperties(SERIALIZER_PREFIX)); + + try { + @SuppressWarnings("unchecked") + Class clazz = (Class) Class + .forName(serializerClazz); + Configurable serializer = clazz.newInstance(); + + if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) { + indexRequestFactory + = (ElasticSearchIndexRequestBuilderFactory) serializer; + indexRequestFactory.configure(serializerContext); + } else if (serializer instanceof ElasticSearchEventSerializer) { + eventSerializer = (ElasticSearchEventSerializer) serializer; + eventSerializer.configure(serializerContext); + } else { + throw new IllegalArgumentException(serializerClazz + + " is not an ElasticSearchEventSerializer"); + } + } catch (Exception e) { + logger.error("Could not instantiate event serializer.", e); + Throwables.propagate(e); + } + + if (sinkCounter == null) { + sinkCounter = new SinkCounter(getName()); + } + + String indexNameBuilderClass = DEFAULT_INDEX_NAME_BUILDER_CLASS; + if (StringUtils.isNotBlank(context.getString(INDEX_NAME_BUILDER))) { + indexNameBuilderClass = context.getString(INDEX_NAME_BUILDER); + } + + Context indexnameBuilderContext = new Context(); + serializerContext.putAll( + context.getSubProperties(INDEX_NAME_BUILDER_PREFIX)); + + try { + @SuppressWarnings("unchecked") + Class clazz + = (Class) Class + .forName(indexNameBuilderClass); + indexNameBuilder = clazz.newInstance(); + indexnameBuilderContext.put(INDEX_NAME, indexName); + indexNameBuilder.configure(indexnameBuilderContext); + } catch (Exception e) { + logger.error("Could not instantiate index name builder.", e); + Throwables.propagate(e); + } + + if (sinkCounter == null) { + sinkCounter = new SinkCounter(getName()); + } + + Preconditions.checkState(StringUtils.isNotBlank(indexName), + "Missing Param:" + INDEX_NAME); + Preconditions.checkState(StringUtils.isNotBlank(indexType), + "Missing Param:" + INDEX_TYPE); + Preconditions.checkState(StringUtils.isNotBlank(clusterName), + "Missing Param:" + CLUSTER_NAME); + Preconditions.checkState(batchSize >= 1, BATCH_SIZE + + " must be greater than 0"); + } + + @Override + public void start() { + ElasticSearchClientFactory clientFactory = new ElasticSearchClientFactory(); + + logger.info("ElasticSearch sink {} started"); + sinkCounter.start(); + try { + if (isLocal) { + client = clientFactory.getLocalClient( + clientType, eventSerializer, indexRequestFactory); + } else { + client = clientFactory.getClient(clientType, serverAddresses, + clusterName, eventSerializer, indexRequestFactory); + client.configure(elasticSearchClientContext); + } + sinkCounter.incrementConnectionCreatedCount(); + } catch (Exception ex) { + ex.printStackTrace(); + sinkCounter.incrementConnectionFailedCount(); + if (client != null) { + client.close(); + sinkCounter.incrementConnectionClosedCount(); + } + } + + super.start(); + } + + @Override + public void stop() { + logger.info("ElasticSearch sink {} stopping"); + if (client != null) { + client.close(); + } + sinkCounter.incrementConnectionClosedCount(); + sinkCounter.stop(); + super.stop(); + } + + /* + * Returns TTL value of ElasticSearch index in milliseconds when TTL specifier + * is "ms" / "s" / "m" / "h" / "d" / "w". In case of unknown specifier TTL is + * not set. When specifier is not provided it defaults to days in milliseconds + * where the number of days is parsed integer from TTL string provided by + * user.

Elasticsearch supports ttl values being provided in the format: + * 1d / 1w / 1ms / 1s / 1h / 1m specify a time unit like d (days), m + * (minutes), h (hours), ms (milliseconds) or w (weeks), milliseconds is used + * as default unit. + * http://www.elasticsearch.org/guide/reference/mapping/ttl-field/. + * + * @param ttl TTL value provided by user in flume configuration file for the + * sink + * + * @return the ttl value in milliseconds + */ + private long parseTTL(String ttl) { + matcher = matcher.reset(ttl); + while (matcher.find()) { + if (matcher.group(2).equals("ms")) { + return Long.parseLong(matcher.group(1)); + } else if (matcher.group(2).equals("s")) { + return TimeUnit.SECONDS.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("m")) { + return TimeUnit.MINUTES.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("h")) { + return TimeUnit.HOURS.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("d")) { + return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("w")) { + return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("")) { + logger.info("TTL qualifier is empty. Defaulting to day qualifier."); + return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1))); + } else { + logger.debug("Unknown TTL qualifier provided. Setting TTL to 0."); + return 0; + } + } + logger.info("TTL not provided. Skipping the TTL config by returning 0."); + return 0; + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java new file mode 100644 index 0000000..da88def --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +public class ElasticSearchSinkConstants { + + /** + * Comma separated list of hostname:port, if the port is not present the + * default port '9300' will be used

+ * Example: + *
+   *  127.0.0.1:92001,127.0.0.2:9300
+   * 
+ */ + public static final String HOSTNAMES = "hostNames"; + + /** + * The name to index the document to, defaults to 'flume'

+ * The current date in the format 'yyyy-MM-dd' will be appended to this name, + * for example 'foo' will result in a daily index of 'foo-yyyy-MM-dd' + */ + public static final String INDEX_NAME = "indexName"; + + /** + * The type to index the document to, defaults to 'log' + */ + public static final String INDEX_TYPE = "indexType"; + + /** + * Name of the ElasticSearch cluster to connect to + */ + public static final String CLUSTER_NAME = "clusterName"; + + /** + * Maximum number of events the sink should take from the channel per + * transaction, if available. Defaults to 100 + */ + public static final String BATCH_SIZE = "batchSize"; + + /** + * TTL in days, when set will cause the expired documents to be deleted + * automatically, if not set documents will never be automatically deleted + */ + public static final String TTL = "ttl"; + + /** + * The fully qualified class name of the serializer the sink should use. + */ + public static final String SERIALIZER = "serializer"; + + /** + * Configuration to pass to the serializer. + */ + public static final String SERIALIZER_PREFIX = SERIALIZER + "."; + + /** + * The fully qualified class name of the index name builder the sink + * should use to determine name of index where the event should be sent. + */ + public static final String INDEX_NAME_BUILDER = "indexNameBuilder"; + + /** + * The fully qualified class name of the index name builder the sink + * should use to determine name of index where the event should be sent. + */ + public static final String INDEX_NAME_BUILDER_PREFIX + = INDEX_NAME_BUILDER + "."; + + /** + * The client type used for sending bulks to ElasticSearch + */ + public static final String CLIENT_TYPE = "client"; + + /** + * The client prefix to extract the configuration that will be passed to + * elasticsearch client. + */ + public static final String CLIENT_PREFIX = CLIENT_TYPE + "."; + + /** + * DEFAULTS USED BY THE SINK + */ + + public static final int DEFAULT_PORT = 9300; + public static final int DEFAULT_TTL = -1; + public static final String DEFAULT_INDEX_NAME = "flume"; + public static final String DEFAULT_INDEX_TYPE = "log"; + public static final String DEFAULT_CLUSTER_NAME = "elasticsearch"; + public static final String DEFAULT_CLIENT_TYPE = "transport"; + public static final String TTL_REGEX = "^(\\d+)(\\D*)"; + public static final String DEFAULT_SERIALIZER_CLASS = "org.apache.flume." + + "sink.elasticsearch.ElasticSearchLogStashEventSerializer"; + public static final String DEFAULT_INDEX_NAME_BUILDER_CLASS = + "org.apache.flume.sink.elasticsearch.TimeBasedIndexNameBuilder"; +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java new file mode 100644 index 0000000..d6cca50 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import java.io.IOException; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.io.BytesStream; + +/** + * Default implementation of {@link ElasticSearchIndexRequestBuilderFactory}. + * It serializes flume events using the + * {@link ElasticSearchEventSerializer} instance configured on the sink. + */ +public class EventSerializerIndexRequestBuilderFactory + extends AbstractElasticSearchIndexRequestBuilderFactory { + + protected final ElasticSearchEventSerializer serializer; + + public EventSerializerIndexRequestBuilderFactory( + ElasticSearchEventSerializer serializer) { + this(serializer, ElasticSearchIndexRequestBuilderFactory.df); + } + + protected EventSerializerIndexRequestBuilderFactory( + ElasticSearchEventSerializer serializer, FastDateFormat fdf) { + super(fdf); + this.serializer = serializer; + } + + @Override + public void configure(Context context) { + serializer.configure(context); + } + + @Override + public void configure(ComponentConfiguration config) { + serializer.configure(config); + } + + @Override + protected void prepareIndexRequest(IndexRequestBuilder indexRequest, + String indexName, String indexType, Event event) throws IOException { + BytesStream contentBuilder = serializer.getContentBuilder(event); + indexRequest.setIndex(indexName) + .setType(indexType) + .setSource(contentBuilder.bytes()); + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java new file mode 100644 index 0000000..1dd4415 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/IndexNameBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurableComponent; + +public interface IndexNameBuilder extends Configurable, + ConfigurableComponent { + /** + * Gets the name of the index to use for an index request + * @param event + * Event which determines index name + * @return index name of the form 'indexPrefix-indexDynamicName' + */ + public String getIndexName(Event event); + + /** + * Gets the prefix of index to use for an index request. + * @param event + * Event which determines index name + * @return Index prefix name + */ + public String getIndexPrefix(Event event); +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java new file mode 100644 index 0000000..801cac9 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java @@ -0,0 +1,46 @@ +/* + * Copyright 2014 Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.elasticsearch; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.formatter.output.BucketPath; + +public class SimpleIndexNameBuilder implements IndexNameBuilder { + + private String indexName; + + @Override + public String getIndexName(Event event) { + return BucketPath.escapeString(indexName, event.getHeaders()); + } + + @Override + public String getIndexPrefix(Event event) { + return BucketPath.escapeString(indexName, event.getHeaders()); + } + + @Override + public void configure(Context context) { + indexName = context.getString(ElasticSearchSinkConstants.INDEX_NAME); + } + + @Override + public void configure(ComponentConfiguration conf) { + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java new file mode 100644 index 0000000..c651732 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.formatter.output.BucketPath; + +import java.util.TimeZone; + +/** + * Default index name builder. It prepares name of index using configured + * prefix and current timestamp. Default format of name is prefix-yyyy-MM-dd". + */ +public class TimeBasedIndexNameBuilder implements + IndexNameBuilder { + + public static final String DATE_FORMAT = "dateFormat"; + public static final String TIME_ZONE = "timeZone"; + + public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd"; + public static final String DEFAULT_TIME_ZONE = "Etc/UTC"; + + private FastDateFormat fastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd", + TimeZone.getTimeZone("Etc/UTC")); + + private String indexPrefix; + + @VisibleForTesting + FastDateFormat getFastDateFormat() { + return fastDateFormat; + } + + /** + * Gets the name of the index to use for an index request + * @param event + * Event for which the name of index has to be prepared + * @return index name of the form 'indexPrefix-formattedTimestamp' + */ + @Override + public String getIndexName(Event event) { + TimestampedEvent timestampedEvent = new TimestampedEvent(event); + long timestamp = timestampedEvent.getTimestamp(); + String realIndexPrefix = BucketPath.escapeString(indexPrefix, event.getHeaders()); + return new StringBuilder(realIndexPrefix).append('-') + .append(fastDateFormat.format(timestamp)).toString(); + } + + @Override + public String getIndexPrefix(Event event) { + return BucketPath.escapeString(indexPrefix, event.getHeaders()); + } + + @Override + public void configure(Context context) { + String dateFormatString = context.getString(DATE_FORMAT); + String timeZoneString = context.getString(TIME_ZONE); + if (StringUtils.isBlank(dateFormatString)) { + dateFormatString = DEFAULT_DATE_FORMAT; + } + if (StringUtils.isBlank(timeZoneString)) { + timeZoneString = DEFAULT_TIME_ZONE; + } + fastDateFormat = FastDateFormat.getInstance(dateFormatString, + TimeZone.getTimeZone(timeZoneString)); + indexPrefix = context.getString(ElasticSearchSinkConstants.INDEX_NAME); + } + + @Override + public void configure(ComponentConfiguration conf) { + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java new file mode 100644 index 0000000..c056839 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimestampedEvent.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import com.google.common.collect.Maps; +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Event; +import org.apache.flume.event.SimpleEvent; +import org.joda.time.DateTimeUtils; + +import java.util.Map; + +/** + * {@link org.apache.flume.Event} implementation that has a timestamp. + * The timestamp is taken from (in order of precedence):
    + *
  1. The "timestamp" header of the base event, if present
  2. + *
  3. The "@timestamp" header of the base event, if present
  4. + *
  5. The current time in millis, otherwise
  6. + *
+ */ +final class TimestampedEvent extends SimpleEvent { + + private final long timestamp; + + TimestampedEvent(Event base) { + setBody(base.getBody()); + Map headers = Maps.newHashMap(base.getHeaders()); + String timestampString = headers.get("timestamp"); + if (StringUtils.isBlank(timestampString)) { + timestampString = headers.get("@timestamp"); + } + if (StringUtils.isBlank(timestampString)) { + this.timestamp = DateTimeUtils.currentTimeMillis(); + headers.put("timestamp", String.valueOf(timestamp )); + } else { + this.timestamp = Long.valueOf(timestampString); + } + setHeaders(headers); + } + + long getTimestamp() { + return timestamp; + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java new file mode 100644 index 0000000..655e00a --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClient.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; +import org.apache.flume.sink.elasticsearch.IndexNameBuilder; + +/** + * Interface for an ElasticSearch client which is responsible for sending bulks + * of events to ElasticSearch. + */ +public interface ElasticSearchClient extends Configurable { + + /** + * Close connection to elastic search in client + */ + void close(); + + /** + * Add new event to the bulk + * + * @param event + * Flume Event + * @param indexNameBuilder + * Index name builder which generates name of index to feed + * @param indexType + * Name of type of document which will be sent to the elasticsearch cluster + * @param ttlMs + * Time to live expressed in milliseconds. Value <= 0 is ignored + * @throws Exception + */ + public void addEvent(Event event, IndexNameBuilder indexNameBuilder, + String indexType, long ttlMs) throws Exception; + + /** + * Sends bulk to the elasticsearch cluster + * + * @throws Exception + */ + void execute() throws Exception; +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java new file mode 100644 index 0000000..986fb2b --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory; + +/** + * Internal ElasticSearch client factory. Responsible for creating instance + * of ElasticSearch clients. + */ +public class ElasticSearchClientFactory { + public static final String TransportClient = "transport"; + public static final String RestClient = "rest"; + + /** + * + * @param clientType + * String representation of client type + * @param hostNames + * Array of strings that represents hostnames with ports (hostname:port) + * @param clusterName + * Elasticsearch cluster name used only by Transport Client + * @param serializer + * Serializer of flume events to elasticsearch documents + * @return + */ + public ElasticSearchClient getClient(String clientType, String[] hostNames, + String clusterName, ElasticSearchEventSerializer serializer, + ElasticSearchIndexRequestBuilderFactory indexBuilder) throws NoSuchClientTypeException { + if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) { + return new ElasticSearchTransportClient(hostNames, clusterName, serializer); + } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null) { + return new ElasticSearchTransportClient(hostNames, clusterName, indexBuilder); + } else if (clientType.equalsIgnoreCase(RestClient) && serializer != null) { + return new ElasticSearchRestClient(hostNames, serializer); + } + throw new NoSuchClientTypeException(); + } + + /** + * Used for tests only. Creates local elasticsearch instance client. + * + * @param clientType Name of client to use + * @param serializer Serializer for the event + * @param indexBuilder Index builder factory + * + * @return Local elastic search instance client + */ + public ElasticSearchClient getLocalClient(String clientType, + ElasticSearchEventSerializer serializer, + ElasticSearchIndexRequestBuilderFactory indexBuilder) + throws NoSuchClientTypeException { + if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) { + return new ElasticSearchTransportClient(serializer); + } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != null) { + return new ElasticSearchTransportClient(indexBuilder); + } else if (clientType.equalsIgnoreCase(RestClient)) { + } + throw new NoSuchClientTypeException(); + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java new file mode 100644 index 0000000..e51efe2 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.apache.flume.sink.elasticsearch.IndexNameBuilder; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.common.bytes.BytesReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Rest ElasticSearch client which is responsible for sending bulks of events to + * ElasticSearch using ElasticSearch HTTP API. This is configurable, so any + * config params required should be taken through this. + */ +public class ElasticSearchRestClient implements ElasticSearchClient { + + private static final String INDEX_OPERATION_NAME = "index"; + private static final String INDEX_PARAM = "_index"; + private static final String TYPE_PARAM = "_type"; + private static final String TTL_PARAM = "_ttl"; + private static final String BULK_ENDPOINT = "_bulk"; + + private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class); + + private final ElasticSearchEventSerializer serializer; + private final RoundRobinList serversList; + + private StringBuilder bulkBuilder; + private HttpClient httpClient; + + public ElasticSearchRestClient(String[] hostNames, + ElasticSearchEventSerializer serializer) { + + for (int i = 0; i < hostNames.length; ++i) { + if (!hostNames[i].contains("http://") && !hostNames[i].contains("https://")) { + hostNames[i] = "http://" + hostNames[i]; + } + } + this.serializer = serializer; + + serversList = new RoundRobinList(Arrays.asList(hostNames)); + httpClient = new DefaultHttpClient(); + bulkBuilder = new StringBuilder(); + } + + @VisibleForTesting + public ElasticSearchRestClient(String[] hostNames, + ElasticSearchEventSerializer serializer, HttpClient client) { + this(hostNames, serializer); + httpClient = client; + } + + @Override + public void configure(Context context) { + } + + @Override + public void close() { + } + + @Override + public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String indexType, + long ttlMs) throws Exception { + BytesReference content = serializer.getContentBuilder(event).bytes(); + Map> parameters = new HashMap>(); + Map indexParameters = new HashMap(); + indexParameters.put(INDEX_PARAM, indexNameBuilder.getIndexName(event)); + indexParameters.put(TYPE_PARAM, indexType); + if (ttlMs > 0) { + indexParameters.put(TTL_PARAM, Long.toString(ttlMs)); + } + parameters.put(INDEX_OPERATION_NAME, indexParameters); + + Gson gson = new Gson(); + synchronized (bulkBuilder) { + bulkBuilder.append(gson.toJson(parameters)); + bulkBuilder.append("\n"); + bulkBuilder.append(content.toBytesArray().toUtf8()); + bulkBuilder.append("\n"); + } + } + + @Override + public void execute() throws Exception { + int statusCode = 0, triesCount = 0; + HttpResponse response = null; + String entity; + synchronized (bulkBuilder) { + entity = bulkBuilder.toString(); + bulkBuilder = new StringBuilder(); + } + + while (statusCode != HttpStatus.SC_OK && triesCount < serversList.size()) { + triesCount++; + String host = serversList.get(); + String url = host + "/" + BULK_ENDPOINT; + HttpPost httpRequest = new HttpPost(url); + httpRequest.setEntity(new StringEntity(entity)); + response = httpClient.execute(httpRequest); + statusCode = response.getStatusLine().getStatusCode(); + logger.info("Status code from elasticsearch: " + statusCode); + if (response.getEntity() != null) { + logger.debug("Status message from elasticsearch: " + + EntityUtils.toString(response.getEntity(), "UTF-8")); + } + } + + if (statusCode != HttpStatus.SC_OK) { + if (response.getEntity() != null) { + throw new EventDeliveryException(EntityUtils.toString(response.getEntity(), "UTF-8")); + } else { + throw new EventDeliveryException("Elasticsearch status code was: " + statusCode); + } + } + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java new file mode 100644 index 0000000..2cf365e --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.apache.flume.sink.elasticsearch.IndexNameBuilder; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_PORT; + +public class ElasticSearchTransportClient implements ElasticSearchClient { + + public static final Logger logger = LoggerFactory + .getLogger(ElasticSearchTransportClient.class); + + private InetSocketTransportAddress[] serverAddresses; + private ElasticSearchEventSerializer serializer; + private ElasticSearchIndexRequestBuilderFactory indexRequestBuilderFactory; + private BulkRequestBuilder bulkRequestBuilder; + + private Client client; + + @VisibleForTesting + InetSocketTransportAddress[] getServerAddresses() { + return serverAddresses; + } + + @VisibleForTesting + void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) { + this.bulkRequestBuilder = bulkRequestBuilder; + } + + /** + * Transport client for external cluster + * + * @param hostNames + * @param clusterName + * @param serializer + */ + public ElasticSearchTransportClient(String[] hostNames, String clusterName, + ElasticSearchEventSerializer serializer) { + configureHostnames(hostNames); + this.serializer = serializer; + openClient(clusterName); + } + + public ElasticSearchTransportClient(String[] hostNames, String clusterName, + ElasticSearchIndexRequestBuilderFactory indexBuilder) { + configureHostnames(hostNames); + this.indexRequestBuilderFactory = indexBuilder; + openClient(clusterName); + } + + /** + * Local transport client only for testing + * + * @param indexBuilderFactory + */ + public ElasticSearchTransportClient(ElasticSearchIndexRequestBuilderFactory indexBuilderFactory) { + this.indexRequestBuilderFactory = indexBuilderFactory; + openLocalDiscoveryClient(); + } + + /** + * Local transport client only for testing + * + * @param serializer + */ + public ElasticSearchTransportClient(ElasticSearchEventSerializer serializer) { + this.serializer = serializer; + openLocalDiscoveryClient(); + } + + /** + * Used for testing + * + * @param client + * ElasticSearch Client + * @param serializer + * Event Serializer + */ + public ElasticSearchTransportClient(Client client, + ElasticSearchEventSerializer serializer) { + this.client = client; + this.serializer = serializer; + } + + /** + * Used for testing + */ + public ElasticSearchTransportClient(Client client, + ElasticSearchIndexRequestBuilderFactory requestBuilderFactory) + throws IOException { + this.client = client; + requestBuilderFactory.createIndexRequest(client, null, null, null); + } + + private void configureHostnames(String[] hostNames) { + logger.warn(Arrays.toString(hostNames)); + serverAddresses = new InetSocketTransportAddress[hostNames.length]; + for (int i = 0; i < hostNames.length; i++) { + String[] hostPort = hostNames[i].trim().split(":"); + String host = hostPort[0].trim(); + int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim()) + : DEFAULT_PORT; + serverAddresses[i] = new InetSocketTransportAddress(host, port); + } + } + + @Override + public void close() { + if (client != null) { + client.close(); + } + client = null; + } + + @Override + public void addEvent(Event event, IndexNameBuilder indexNameBuilder, + String indexType, long ttlMs) throws Exception { + if (bulkRequestBuilder == null) { + bulkRequestBuilder = client.prepareBulk(); + } + + IndexRequestBuilder indexRequestBuilder = null; + if (indexRequestBuilderFactory == null) { + indexRequestBuilder = client + .prepareIndex(indexNameBuilder.getIndexName(event), indexType) + .setSource(serializer.getContentBuilder(event).bytes()); + } else { + indexRequestBuilder = indexRequestBuilderFactory.createIndexRequest( + client, indexNameBuilder.getIndexPrefix(event), indexType, event); + } + + if (ttlMs > 0) { + indexRequestBuilder.setTTL(ttlMs); + } + bulkRequestBuilder.add(indexRequestBuilder); + } + + @Override + public void execute() throws Exception { + try { + BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); + if (bulkResponse.hasFailures()) { + throw new EventDeliveryException(bulkResponse.buildFailureMessage()); + } + } finally { + bulkRequestBuilder = client.prepareBulk(); + } + } + + /** + * Open client to elaticsearch cluster + * + * @param clusterName + */ + private void openClient(String clusterName) { + logger.info("Using ElasticSearch hostnames: {} ", + Arrays.toString(serverAddresses)); + Settings settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", clusterName).build(); + + TransportClient transportClient = new TransportClient(settings); + for (InetSocketTransportAddress host : serverAddresses) { + transportClient.addTransportAddress(host); + } + if (client != null) { + client.close(); + } + client = transportClient; + } + + /* + * FOR TESTING ONLY... + * + * Opens a local discovery node for talking to an elasticsearch server running + * in the same JVM + */ + private void openLocalDiscoveryClient() { + logger.info("Using ElasticSearch AutoDiscovery mode"); + Node node = NodeBuilder.nodeBuilder().client(true).local(true).node(); + if (client != null) { + client.close(); + } + client = node.client(); + } + + @Override + public void configure(Context context) { + //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java new file mode 100644 index 0000000..41fbe0d --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/NoSuchClientTypeException.java @@ -0,0 +1,23 @@ +/* + * Copyright 2014 Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.elasticsearch.client; + +/** + * Exception class + */ +class NoSuchClientTypeException extends Exception { +} diff --git a/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java new file mode 100644 index 0000000..4cbbe91 --- /dev/null +++ b/flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java @@ -0,0 +1,44 @@ +package org.apache.flume.sink.elasticsearch.client; + +import java.util.Collection; +import java.util.Iterator; + +/* + * Copyright 2014 Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class RoundRobinList { + + private Iterator iterator; + private final Collection elements; + + public RoundRobinList(Collection elements) { + this.elements = elements; + iterator = this.elements.iterator(); + } + + public synchronized T get() { + if (iterator.hasNext()) { + return iterator.next(); + } else { + iterator = elements.iterator(); + return iterator.next(); + } + } + + public int size() { + return elements.size(); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java new file mode 100644 index 0000000..9fbd747 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.Gateway; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.joda.time.DateTimeUtils; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; +import static org.junit.Assert.assertEquals; + +public abstract class AbstractElasticSearchSinkTest { + + static final String DEFAULT_INDEX_NAME = "flume"; + static final String DEFAULT_INDEX_TYPE = "log"; + static final String DEFAULT_CLUSTER_NAME = "elasticsearch"; + static final long FIXED_TIME_MILLIS = 123456789L; + + Node node; + Client client; + String timestampedIndexName; + Map parameters; + + void initDefaults() { + parameters = Maps.newHashMap(); + parameters.put(INDEX_NAME, DEFAULT_INDEX_NAME); + parameters.put(INDEX_TYPE, DEFAULT_INDEX_TYPE); + parameters.put(CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + parameters.put(BATCH_SIZE, "1"); + parameters.put(TTL, "5"); + + timestampedIndexName = DEFAULT_INDEX_NAME + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS); + } + + void createNodes() throws Exception { + Settings settings = ImmutableSettings + .settingsBuilder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .put("routing.hash.type", "simple") + .put("gateway.type", "none") + .put("path.data", "target/es-test") + .build(); + + node = NodeBuilder.nodeBuilder().settings(settings).local(true).node(); + client = node.client(); + + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute() + .actionGet(); + } + + void shutdownNodes() throws Exception { + ((InternalNode) node).injector().getInstance(Gateway.class).reset(); + client.close(); + node.close(); + } + + @Before + public void setFixedJodaTime() { + DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS); + } + + @After + public void resetJodaTime() { + DateTimeUtils.setCurrentMillisSystem(); + } + + Channel bindAndStartChannel(ElasticSearchSink fixture) { + // Configure the channel + Channel channel = new MemoryChannel(); + Configurables.configure(channel, new Context()); + + // Wire them together + fixture.setChannel(channel); + fixture.start(); + return channel; + } + + void assertMatchAllQuery(int expectedHits, Event... events) { + assertSearch(expectedHits, performSearch(QueryBuilders.matchAllQuery()), + null, events); + } + + void assertBodyQuery(int expectedHits, Event... events) { + // Perform Multi Field Match + assertSearch(expectedHits, + performSearch(QueryBuilders.fieldQuery("@message", "event")), + null, events); + } + + SearchResponse performSearch(QueryBuilder query) { + return client.prepareSearch(timestampedIndexName) + .setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet(); + } + + void assertSearch(int expectedHits, SearchResponse response, Map expectedBody, + Event... events) { + SearchHits hitResponse = response.getHits(); + assertEquals(expectedHits, hitResponse.getTotalHits()); + + SearchHit[] hits = hitResponse.getHits(); + Arrays.sort(hits, new Comparator() { + @Override + public int compare(SearchHit o1, SearchHit o2) { + return o1.getSourceAsString().compareTo(o2.getSourceAsString()); + } + }); + + for (int i = 0; i < events.length; i++) { + Event event = events[i]; + SearchHit hit = hits[i]; + Map source = hit.getSource(); + if (expectedBody == null) { + assertEquals(new String(event.getBody()), source.get("@message")); + } else { + assertEquals(expectedBody, source.get("@message")); + } + } + } + +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java new file mode 100644 index 0000000..d4e4654 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchDynamicSerializer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.Test; + +import java.util.Map; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; + +public class TestElasticSearchDynamicSerializer { + + @Test + public void testRoundTrip() throws Exception { + ElasticSearchDynamicSerializer fixture = new ElasticSearchDynamicSerializer(); + Context context = new Context(); + fixture.configure(context); + + String message = "test body"; + Map headers = Maps.newHashMap(); + headers.put("headerNameOne", "headerValueOne"); + headers.put("headerNameTwo", "headerValueTwo"); + headers.put("headerNameThree", "headerValueThree"); + Event event = EventBuilder.withBody(message.getBytes(charset)); + event.setHeaders(headers); + + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("body", new String(message.getBytes(), charset)); + for (String headerName : headers.keySet()) { + expected.field(headerName, new String(headers.get(headerName).getBytes(), + charset)); + } + expected.endObject(); + + XContentBuilder actual = fixture.getContentBuilder(event); + + assertEquals(new String(expected.bytes().array()), new String(actual + .bytes().array())); + + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java new file mode 100644 index 0000000..b62254e --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import com.google.common.collect.Maps; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.sink.SinkConfiguration; +import org.apache.flume.event.SimpleEvent; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.BytesStream; +import org.elasticsearch.common.io.FastByteArrayOutputStream; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestElasticSearchIndexRequestBuilderFactory + extends AbstractElasticSearchSinkTest { + + private static final Client FAKE_CLIENT = null; + + private EventSerializerIndexRequestBuilderFactory factory; + + private FakeEventSerializer serializer; + + @Before + public void setupFactory() throws Exception { + serializer = new FakeEventSerializer(); + factory = new EventSerializerIndexRequestBuilderFactory(serializer) { + @Override + IndexRequestBuilder prepareIndex(Client client) { + return new IndexRequestBuilder(FAKE_CLIENT); + } + }; + } + + @Test + public void shouldUseUtcAsBasisForDateFormat() { + assertEquals("Coordinated Universal Time", + factory.fastDateFormat.getTimeZone().getDisplayName()); + } + + @Test + public void indexNameShouldBePrefixDashFormattedTimestamp() { + long millis = 987654321L; + assertEquals("prefix-" + factory.fastDateFormat.format(millis), + factory.getIndexName("prefix", millis)); + } + + @Test + public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp()); + assertEquals(String.valueOf(FIXED_TIME_MILLIS), + timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("timestamp", "-321"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-321L, timestampedEvent.getTimestamp()); + assertEquals("-321", timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("@timestamp", "-999"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-999L, timestampedEvent.getTimestamp()); + assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp")); + assertNull(timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + base.setBody(new byte[] {1,2,3,4}); + Map headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("foo", "bar"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals("bar", timestampedEvent.getHeaders().get("foo")); + assertArrayEquals(base.getBody(), timestampedEvent.getBody()); + } + + @Test + public void shouldSetIndexNameTypeAndSerializedEventIntoIndexRequest() + throws Exception { + + String indexPrefix = "qwerty"; + String indexType = "uiop"; + Event event = new SimpleEvent(); + + IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest( + FAKE_CLIENT, indexPrefix, indexType, event); + + assertEquals(indexPrefix + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS), + indexRequestBuilder.request().index()); + assertEquals(indexType, indexRequestBuilder.request().type()); + assertArrayEquals(FakeEventSerializer.FAKE_BYTES, + indexRequestBuilder.request().source().array()); + } + + @Test + public void shouldSetIndexNameFromTimestampHeaderWhenPresent() + throws Exception { + String indexPrefix = "qwerty"; + String indexType = "uiop"; + Event event = new SimpleEvent(); + event.getHeaders().put("timestamp", "1213141516"); + + IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest( + null, indexPrefix, indexType, event); + + assertEquals(indexPrefix + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(1213141516L), + indexRequestBuilder.request().index()); + } + + @Test + public void shouldSetIndexNameTypeFromHeaderWhenPresent() + throws Exception { + String indexPrefix = "%{index-name}"; + String indexType = "%{index-type}"; + String indexValue = "testing-index-name-from-headers"; + String typeValue = "testing-index-type-from-headers"; + + Event event = new SimpleEvent(); + event.getHeaders().put("index-name", indexValue); + event.getHeaders().put("index-type", typeValue); + + IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest( + null, indexPrefix, indexType, event); + + assertEquals(indexValue + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS), + indexRequestBuilder.request().index()); + assertEquals(typeValue, indexRequestBuilder.request().type()); + } + + @Test + public void shouldConfigureEventSerializer() throws Exception { + assertFalse(serializer.configuredWithContext); + factory.configure(new Context()); + assertTrue(serializer.configuredWithContext); + + assertFalse(serializer.configuredWithComponentConfiguration); + factory.configure(new SinkConfiguration("name")); + assertTrue(serializer.configuredWithComponentConfiguration); + } + + static class FakeEventSerializer implements ElasticSearchEventSerializer { + + static final byte[] FAKE_BYTES = new byte[]{9, 8, 7, 6}; + boolean configuredWithContext; + boolean configuredWithComponentConfiguration; + + @Override + public BytesStream getContentBuilder(Event event) throws IOException { + FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4); + fbaos.write(FAKE_BYTES); + return fbaos; + } + + @Override + public void configure(Context arg0) { + configuredWithContext = true; + } + + @Override + public void configure(ComponentConfiguration arg0) { + configuredWithComponentConfiguration = true; + } + } + +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java new file mode 100644 index 0000000..65b4dab --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import com.google.gson.JsonParser; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.common.collect.Maps; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.junit.Test; + +import java.util.Date; +import java.util.Map; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.junit.Assert.assertEquals; + +public class TestElasticSearchLogStashEventSerializer { + + @Test + public void testRoundTrip() throws Exception { + ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer(); + Context context = new Context(); + fixture.configure(context); + + String message = "test body"; + Map headers = Maps.newHashMap(); + long timestamp = System.currentTimeMillis(); + headers.put("timestamp", String.valueOf(timestamp)); + headers.put("source", "flume_tail_src"); + headers.put("host", "test@localhost"); + headers.put("src_path", "/tmp/test"); + headers.put("headerNameOne", "headerValueOne"); + headers.put("headerNameTwo", "headerValueTwo"); + headers.put("type", "sometype"); + Event event = EventBuilder.withBody(message.getBytes(charset)); + event.setHeaders(headers); + + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("@message", new String(message.getBytes(), charset)); + expected.field("@timestamp", new Date(timestamp)); + expected.field("@source", "flume_tail_src"); + expected.field("@type", "sometype"); + expected.field("@source_host", "test@localhost"); + expected.field("@source_path", "/tmp/test"); + + expected.startObject("@fields"); + expected.field("timestamp", String.valueOf(timestamp)); + expected.field("src_path", "/tmp/test"); + expected.field("host", "test@localhost"); + expected.field("headerNameTwo", "headerValueTwo"); + expected.field("source", "flume_tail_src"); + expected.field("headerNameOne", "headerValueOne"); + expected.field("type", "sometype"); + expected.endObject(); + + expected.endObject(); + + XContentBuilder actual = fixture.getContentBuilder(event); + + JsonParser parser = new JsonParser(); + assertEquals(parser.parse(expected.string()),parser.parse(actual.string())); + } + + @Test + public void shouldHandleInvalidJSONDuringComplexParsing() throws Exception { + ElasticSearchLogStashEventSerializer fixture = new ElasticSearchLogStashEventSerializer(); + Context context = new Context(); + fixture.configure(context); + + String message = "{flume: somethingnotvalid}"; + Map headers = Maps.newHashMap(); + long timestamp = System.currentTimeMillis(); + headers.put("timestamp", String.valueOf(timestamp)); + headers.put("source", "flume_tail_src"); + headers.put("host", "test@localhost"); + headers.put("src_path", "/tmp/test"); + headers.put("headerNameOne", "headerValueOne"); + headers.put("headerNameTwo", "headerValueTwo"); + headers.put("type", "sometype"); + Event event = EventBuilder.withBody(message.getBytes(charset)); + event.setHeaders(headers); + + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("@message", new String(message.getBytes(), charset)); + expected.field("@timestamp", new Date(timestamp)); + expected.field("@source", "flume_tail_src"); + expected.field("@type", "sometype"); + expected.field("@source_host", "test@localhost"); + expected.field("@source_path", "/tmp/test"); + + expected.startObject("@fields"); + expected.field("timestamp", String.valueOf(timestamp)); + expected.field("src_path", "/tmp/test"); + expected.field("host", "test@localhost"); + expected.field("headerNameTwo", "headerValueTwo"); + expected.field("source", "flume_tail_src"); + expected.field("headerNameOne", "headerValueOne"); + expected.field("type", "sometype"); + expected.endObject(); + + expected.endObject(); + + XContentBuilder actual = fixture.getContentBuilder(event); + + JsonParser parser = new JsonParser(); + assertEquals(parser.parse(expected.string()),parser.parse(actual.string())); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java new file mode 100644 index 0000000..69acc06 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Sink.Status; +import org.apache.flume.Transaction; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.UUID; +import org.elasticsearch.common.io.BytesStream; +import org.elasticsearch.common.io.FastByteArrayOutputStream; +import org.elasticsearch.index.query.QueryBuilders; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { + + private ElasticSearchSink fixture; + + @Before + public void init() throws Exception { + initDefaults(); + createNodes(); + fixture = new ElasticSearchSink(true); + fixture.setName("ElasticSearchSink-" + UUID.randomUUID().toString()); + } + + @After + public void tearDown() throws Exception { + shutdownNodes(); + } + + @Test + public void shouldIndexOneEvent() throws Exception { + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + Transaction tx = channel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody("event #1 or 1".getBytes()); + channel.put(event); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + assertMatchAllQuery(1, event); + assertBodyQuery(1, event); + } + + @Test + public void shouldIndexInvalidComplexJsonBody() throws Exception { + parameters.put(BATCH_SIZE, "3"); + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + Transaction tx = channel.getTransaction(); + tx.begin(); + Event event1 = EventBuilder.withBody("TEST1 {test}".getBytes()); + channel.put(event1); + Event event2 = EventBuilder.withBody("{test: TEST2 }".getBytes()); + channel.put(event2); + Event event3 = EventBuilder.withBody("{\"test\":{ TEST3 {test} }}".getBytes()); + channel.put(event3); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + assertMatchAllQuery(3); + assertSearch(1, + performSearch(QueryBuilders.fieldQuery("@message", "TEST1")), + null, event1); + assertSearch(1, + performSearch(QueryBuilders.fieldQuery("@message", "TEST2")), + null, event2); + assertSearch(1, + performSearch(QueryBuilders.fieldQuery("@message", "TEST3")), + null, event3); + } + + @Test + public void shouldIndexComplexJsonEvent() throws Exception { + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + Transaction tx = channel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody( + "{\"event\":\"json content\",\"num\":1}".getBytes()); + channel.put(event); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + Map expectedBody = new HashMap(); + expectedBody.put("event", "json content"); + expectedBody.put("num", 1); + + assertSearch(1, + performSearch(QueryBuilders.matchAllQuery()), expectedBody, event); + assertSearch(1, + performSearch(QueryBuilders.fieldQuery("@message.event", "json")), + expectedBody, event); + } + + @Test + public void shouldIndexFiveEvents() throws Exception { + // Make it so we only need to call process once + parameters.put(BATCH_SIZE, "5"); + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + int numberOfEvents = 5; + Event[] events = new Event[numberOfEvents]; + + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < numberOfEvents; i++) { + String body = "event #" + i + " of " + numberOfEvents; + Event event = EventBuilder.withBody(body.getBytes()); + events[i] = event; + channel.put(event); + } + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + + assertMatchAllQuery(numberOfEvents, events); + assertBodyQuery(5, events); + } + + @Test + public void shouldIndexFiveEventsOverThreeBatches() throws Exception { + parameters.put(BATCH_SIZE, "2"); + Configurables.configure(fixture, new Context(parameters)); + Channel channel = bindAndStartChannel(fixture); + + int numberOfEvents = 5; + Event[] events = new Event[numberOfEvents]; + + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < numberOfEvents; i++) { + String body = "event #" + i + " of " + numberOfEvents; + Event event = EventBuilder.withBody(body.getBytes()); + events[i] = event; + channel.put(event); + } + tx.commit(); + tx.close(); + + int count = 0; + Status status = Status.READY; + while (status != Status.BACKOFF) { + count++; + status = fixture.process(); + } + fixture.stop(); + + assertEquals(3, count); + + client.admin().indices() + .refresh(Requests.refreshRequest(timestampedIndexName)).actionGet(); + assertMatchAllQuery(numberOfEvents, events); + assertBodyQuery(5, events); + } + + @Test + public void shouldParseConfiguration() { + parameters.put(HOSTNAMES, "10.5.5.27"); + parameters.put(CLUSTER_NAME, "testing-cluster-name"); + parameters.put(INDEX_NAME, "testing-index-name"); + parameters.put(INDEX_TYPE, "testing-index-type"); + parameters.put(TTL, "10"); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + String[] expected = { "10.5.5.27" }; + + assertEquals("testing-cluster-name", fixture.getClusterName()); + assertEquals("testing-index-name", fixture.getIndexName()); + assertEquals("testing-index-type", fixture.getIndexType()); + assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs()); + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseConfigurationUsingDefaults() { + parameters.put(HOSTNAMES, "10.5.5.27"); + parameters.remove(INDEX_NAME); + parameters.remove(INDEX_TYPE); + parameters.remove(CLUSTER_NAME); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + String[] expected = { "10.5.5.27" }; + + assertEquals(DEFAULT_INDEX_NAME, fixture.getIndexName()); + assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType()); + assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName()); + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseMultipleHostUsingDefaultPorts() { + parameters.put(HOSTNAMES, "10.5.5.27,10.5.5.28,10.5.5.29"); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseMultipleHostWithWhitespacesUsingDefaultPorts() { + parameters.put(HOSTNAMES, " 10.5.5.27 , 10.5.5.28 , 10.5.5.29 "); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + String[] expected = { "10.5.5.27", "10.5.5.28", "10.5.5.29" }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseMultipleHostAndPorts() { + parameters.put(HOSTNAMES, "10.5.5.27:9300,10.5.5.28:9301,10.5.5.29:9302"); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldParseMultipleHostAndPortsWithWhitespaces() { + parameters.put(HOSTNAMES, + " 10.5.5.27 : 9300 , 10.5.5.28 : 9301 , 10.5.5.29 : 9302 "); + + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + String[] expected = { "10.5.5.27:9300", "10.5.5.28:9301", "10.5.5.29:9302" }; + + assertArrayEquals(expected, fixture.getServerAddresses()); + } + + @Test + public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory() + throws Exception { + parameters.put(SERIALIZER, + CustomElasticSearchIndexRequestBuilderFactory.class.getName()); + + fixture.configure(new Context(parameters)); + + Channel channel = bindAndStartChannel(fixture); + Transaction tx = channel.getTransaction(); + tx.begin(); + String body = "{ foo: \"bar\" }"; + Event event = EventBuilder.withBody(body.getBytes()); + channel.put(event); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + + assertEquals(fixture.getIndexName() + "-05_17_36_789", + CustomElasticSearchIndexRequestBuilderFactory.actualIndexName); + assertEquals(fixture.getIndexType(), + CustomElasticSearchIndexRequestBuilderFactory.actualIndexType); + assertArrayEquals(event.getBody(), + CustomElasticSearchIndexRequestBuilderFactory.actualEventBody); + assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext); + } + + @Test + public void shouldParseFullyQualifiedTTLs() { + Map testTTLMap = new HashMap(); + testTTLMap.put("1ms", Long.valueOf(1)); + testTTLMap.put("1s", Long.valueOf(1000)); + testTTLMap.put("1m", Long.valueOf(60000)); + testTTLMap.put("1h", Long.valueOf(3600000)); + testTTLMap.put("1d", Long.valueOf(86400000)); + testTTLMap.put("1w", Long.valueOf(604800000)); + testTTLMap.put("1", Long.valueOf(86400000)); + + parameters.put(HOSTNAMES, "10.5.5.27"); + parameters.put(CLUSTER_NAME, "testing-cluster-name"); + parameters.put(INDEX_NAME, "testing-index-name"); + parameters.put(INDEX_TYPE, "testing-index-type"); + + for (String ttl : testTTLMap.keySet()) { + parameters.put(TTL, ttl); + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + String[] expected = { "10.5.5.27" }; + assertEquals("testing-cluster-name", fixture.getClusterName()); + assertEquals("testing-index-name", fixture.getIndexName()); + assertEquals("testing-index-type", fixture.getIndexType()); + assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs()); + assertArrayEquals(expected, fixture.getServerAddresses()); + + } + } + + public static final class CustomElasticSearchIndexRequestBuilderFactory + extends AbstractElasticSearchIndexRequestBuilderFactory { + + static String actualIndexName; + static String actualIndexType; + static byte[] actualEventBody; + static boolean hasContext; + + public CustomElasticSearchIndexRequestBuilderFactory() { + super(FastDateFormat.getInstance("HH_mm_ss_SSS", TimeZone.getTimeZone("EST5EDT"))); + } + + @Override + protected void prepareIndexRequest(IndexRequestBuilder indexRequest, String indexName, + String indexType, Event event) throws IOException { + actualIndexName = indexName; + actualIndexType = indexType; + actualEventBody = event.getBody(); + indexRequest.setIndex(indexName).setType(indexType).setSource(event.getBody()); + } + + @Override + public void configure(Context arg0) { + hasContext = true; + } + + @Override + public void configure(ComponentConfiguration arg0) { + //no-op + } + } + + @Test + public void shouldFailToConfigureWithInvalidSerializerClass() + throws Exception { + + parameters.put(SERIALIZER, "java.lang.String"); + try { + Configurables.configure(fixture, new Context(parameters)); + } catch (ClassCastException e) { + // expected + } + + parameters.put(SERIALIZER, FakeConfigurable.class.getName()); + try { + Configurables.configure(fixture, new Context(parameters)); + } catch (IllegalArgumentException e) { + // expected + } + } + + @Test + public void shouldUseSpecifiedSerializer() throws Exception { + Context context = new Context(); + context.put(SERIALIZER, + "org.apache.flume.sink.elasticsearch.FakeEventSerializer"); + + assertNull(fixture.getEventSerializer()); + fixture.configure(context); + assertTrue(fixture.getEventSerializer() instanceof FakeEventSerializer); + } + + @Test + public void shouldUseSpecifiedIndexNameBuilder() throws Exception { + Context context = new Context(); + context.put(ElasticSearchSinkConstants.INDEX_NAME_BUILDER, + "org.apache.flume.sink.elasticsearch.FakeIndexNameBuilder"); + + assertNull(fixture.getIndexNameBuilder()); + fixture.configure(context); + assertTrue(fixture.getIndexNameBuilder() instanceof FakeIndexNameBuilder); + } + + public static class FakeConfigurable implements Configurable { + @Override + public void configure(Context arg0) { + // no-op + } + } +} + +/** + * Internal class. Fake event serializer used for tests + */ +class FakeEventSerializer implements ElasticSearchEventSerializer { + + static final byte[] FAKE_BYTES = new byte[] { 9, 8, 7, 6 }; + boolean configuredWithContext; + boolean configuredWithComponentConfiguration; + + @Override + public BytesStream getContentBuilder(Event event) throws IOException { + FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4); + fbaos.write(FAKE_BYTES); + return fbaos; + } + + @Override + public void configure(Context arg0) { + configuredWithContext = true; + } + + @Override + public void configure(ComponentConfiguration arg0) { + configuredWithComponentConfiguration = true; + } +} + +/** + * Internal class. Fake index name builder used only for tests. + */ +class FakeIndexNameBuilder implements IndexNameBuilder { + + static final String INDEX_NAME = "index_name"; + + @Override + public String getIndexName(Event event) { + return INDEX_NAME; + } + + @Override + public String getIndexPrefix(Event event) { + return INDEX_NAME; + } + + @Override + public void configure(Context context) { + } + + @Override + public void configure(ComponentConfiguration conf) { + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java new file mode 100644 index 0000000..2a36439 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import org.apache.flume.FlumeException; +import org.apache.flume.Sink; +import org.apache.flume.SinkFactory; +import org.apache.flume.sink.DefaultSinkFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestElasticSearchSinkCreation { + + private SinkFactory sinkFactory; + + @Before + public void setUp() { + sinkFactory = new DefaultSinkFactory(); + } + + private void verifySinkCreation(String name, String type, + Class typeClass) throws FlumeException { + Sink sink = sinkFactory.create(name, type); + Assert.assertNotNull(sink); + Assert.assertTrue(typeClass.isInstance(sink)); + } + + @Test + public void testSinkCreation() { + verifySinkCreation("elasticsearch-sink", "elasticsearch", ElasticSearchSink.class); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java new file mode 100644 index 0000000..678342a --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilderTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.SimpleEvent; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TimeBasedIndexNameBuilderTest { + + private TimeBasedIndexNameBuilder indexNameBuilder; + + @Before + public void setUp() throws Exception { + Context context = new Context(); + context.put(ElasticSearchSinkConstants.INDEX_NAME, "prefix"); + indexNameBuilder = new TimeBasedIndexNameBuilder(); + indexNameBuilder.configure(context); + } + + @Test + public void shouldUseUtcAsBasisForDateFormat() { + assertEquals("Coordinated Universal Time", + indexNameBuilder.getFastDateFormat().getTimeZone().getDisplayName()); + } + + @Test + public void indexNameShouldBePrefixDashFormattedTimestamp() { + long time = 987654321L; + Event event = new SimpleEvent(); + Map headers = new HashMap(); + headers.put("timestamp", Long.toString(time)); + event.setHeaders(headers); + assertEquals("prefix-" + indexNameBuilder.getFastDateFormat().format(time), + indexNameBuilder.getIndexName(event)); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java new file mode 100644 index 0000000..bef2ac6 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TimestampedEventTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import com.google.common.collect.Maps; +import org.apache.flume.event.SimpleEvent; +import org.joda.time.DateTimeUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TimestampedEventTest { + static final long FIXED_TIME_MILLIS = 123456789L; + + @Before + public void setFixedJodaTime() { + DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS); + } + + @Test + public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp()); + assertEquals(String.valueOf(FIXED_TIME_MILLIS), + timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("timestamp", "-321"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-321L, timestampedEvent.getTimestamp()); + assertEquals("-321", timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("@timestamp", "-999"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-999L, timestampedEvent.getTimestamp()); + assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp")); + assertNull(timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + base.setBody(new byte[] {1,2,3,4}); + Map headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("foo", "bar"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals("bar", timestampedEvent.getHeaders().get("foo")); + assertArrayEquals(base.getBody(), timestampedEvent.getBody()); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java new file mode 100644 index 0000000..0d1d092 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2014 Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.sink.elasticsearch.client; + +import java.util.Arrays; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class RoundRobinListTest { + + private RoundRobinList fixture; + + @Before + public void setUp() { + fixture = new RoundRobinList(Arrays.asList("test1", "test2")); + } + + @Test + public void shouldReturnNextElement() { + assertEquals("test1", fixture.get()); + assertEquals("test2", fixture.get()); + assertEquals("test1", fixture.get()); + assertEquals("test2", fixture.get()); + assertEquals("test1", fixture.get()); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java new file mode 100644 index 0000000..c3f07b0 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestElasticSearchClientFactory { + + ElasticSearchClientFactory factory; + + @Mock + ElasticSearchEventSerializer serializer; + + @Before + public void setUp() { + initMocks(this); + factory = new ElasticSearchClientFactory(); + } + + @Test + public void shouldReturnTransportClient() throws Exception { + String[] hostNames = { "127.0.0.1" }; + Object o = factory.getClient(ElasticSearchClientFactory.TransportClient, + hostNames, "test", serializer, null); + assertThat(o, instanceOf(ElasticSearchTransportClient.class)); + } + + @Test + public void shouldReturnRestClient() throws NoSuchClientTypeException { + String[] hostNames = { "127.0.0.1" }; + Object o = factory.getClient(ElasticSearchClientFactory.RestClient, + hostNames, "test", serializer, null); + assertThat(o, instanceOf(ElasticSearchRestClient.class)); + } + + @Test(expected = NoSuchClientTypeException.class) + public void shouldThrowNoSuchClientTypeException() throws NoSuchClientTypeException { + String[] hostNames = { "127.0.0.1" }; + factory.getClient("not_existing_client", hostNames, "test", null, null); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java new file mode 100644 index 0000000..9551c81 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import com.google.common.base.Splitter; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.apache.flume.sink.elasticsearch.IndexNameBuilder; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.BytesStream; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestElasticSearchRestClient { + + private ElasticSearchRestClient fixture; + + @Mock + private ElasticSearchEventSerializer serializer; + + @Mock + private IndexNameBuilder nameBuilder; + + @Mock + private Event event; + + @Mock + private HttpClient httpClient; + + @Mock + private HttpResponse httpResponse; + + @Mock + private StatusLine httpStatus; + + @Mock + private HttpEntity httpEntity; + + private static final String INDEX_NAME = "foo_index"; + private static final String MESSAGE_CONTENT = "{\"body\":\"test\"}"; + private static final String[] HOSTS = {"host1", "host2"}; + + @Before + public void setUp() throws IOException { + initMocks(this); + BytesReference bytesReference = mock(BytesReference.class); + BytesStream bytesStream = mock(BytesStream.class); + + when(nameBuilder.getIndexName(any(Event.class))).thenReturn(INDEX_NAME); + when(bytesReference.toBytesArray()).thenReturn(new BytesArray(MESSAGE_CONTENT)); + when(bytesStream.bytes()).thenReturn(bytesReference); + when(serializer.getContentBuilder(any(Event.class))).thenReturn(bytesStream); + fixture = new ElasticSearchRestClient(HOSTS, serializer, httpClient); + } + + @Test + public void shouldAddNewEventWithoutTTL() throws Exception { + ArgumentCaptor argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", -1); + fixture.execute(); + + verify(httpClient).execute(isA(HttpUriRequest.class)); + verify(httpClient).execute(argument.capture()); + + assertEquals("http://host1/_bulk", argument.getValue().getURI().toString()); + assertTrue(verifyJsonEvents("{\"index\":{\"_type\":\"bar_type\", \"_index\":\"foo_index\"}}\n", + MESSAGE_CONTENT, EntityUtils.toString(argument.getValue().getEntity()))); + } + + @Test + public void shouldAddNewEventWithTTL() throws Exception { + ArgumentCaptor argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", 123); + fixture.execute(); + + verify(httpClient).execute(isA(HttpUriRequest.class)); + verify(httpClient).execute(argument.capture()); + + assertEquals("http://host1/_bulk", argument.getValue().getURI().toString()); + assertTrue(verifyJsonEvents( + "{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n", + MESSAGE_CONTENT, EntityUtils.toString(argument.getValue().getEntity()))); + } + + private boolean verifyJsonEvents(String expectedIndex, String expectedBody, String actual) { + Iterator it = Splitter.on("\n").split(actual).iterator(); + JsonParser parser = new JsonParser(); + JsonObject[] arr = new JsonObject[2]; + for (int i = 0; i < 2; i++) { + arr[i] = (JsonObject) parser.parse(it.next()); + } + return arr[0].equals(parser.parse(expectedIndex)) && arr[1].equals(parser.parse(expectedBody)); + } + + @Test(expected = EventDeliveryException.class) + public void shouldThrowEventDeliveryException() throws Exception { + ArgumentCaptor argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", 123); + fixture.execute(); + } + + @Test() + public void shouldRetryBulkOperation() throws Exception { + ArgumentCaptor argument = ArgumentCaptor.forClass(HttpPost.class); + + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR, + HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(httpStatus); + when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); + + fixture.addEvent(event, nameBuilder, "bar_type", 123); + fixture.execute(); + + verify(httpClient, times(2)).execute(isA(HttpUriRequest.class)); + verify(httpClient, times(2)).execute(argument.capture()); + + List allValues = argument.getAllValues(); + assertEquals("http://host1/_bulk", allValues.get(0).getURI().toString()); + assertEquals("http://host2/_bulk", allValues.get(1).getURI().toString()); + } +} diff --git a/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java new file mode 100644 index 0000000..b7b8e74 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchTransportClient.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch.client; + +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; +import org.apache.flume.sink.elasticsearch.IndexNameBuilder; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.BytesStream; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class TestElasticSearchTransportClient { + + private ElasticSearchTransportClient fixture; + + @Mock + private ElasticSearchEventSerializer serializer; + + @Mock + private IndexNameBuilder nameBuilder; + + @Mock + private Client elasticSearchClient; + + @Mock + private BulkRequestBuilder bulkRequestBuilder; + + @Mock + private IndexRequestBuilder indexRequestBuilder; + + @Mock + private Event event; + + @Before + public void setUp() throws IOException { + initMocks(this); + BytesReference bytesReference = mock(BytesReference.class); + BytesStream bytesStream = mock(BytesStream.class); + + when(nameBuilder.getIndexName(any(Event.class))).thenReturn("foo_index"); + when(bytesReference.toBytes()).thenReturn("{\"body\":\"test\"}".getBytes()); + when(bytesStream.bytes()).thenReturn(bytesReference); + when(serializer.getContentBuilder(any(Event.class))) + .thenReturn(bytesStream); + when(elasticSearchClient.prepareIndex(anyString(), anyString())) + .thenReturn(indexRequestBuilder); + when(indexRequestBuilder.setSource(bytesReference)).thenReturn( + indexRequestBuilder); + + fixture = new ElasticSearchTransportClient(elasticSearchClient, serializer); + fixture.setBulkRequestBuilder(bulkRequestBuilder); + } + + @Test + public void shouldAddNewEventWithoutTTL() throws Exception { + fixture.addEvent(event, nameBuilder, "bar_type", -1); + verify(indexRequestBuilder).setSource( + serializer.getContentBuilder(event).bytes()); + verify(bulkRequestBuilder).add(indexRequestBuilder); + } + + @Test + public void shouldAddNewEventWithTTL() throws Exception { + fixture.addEvent(event, nameBuilder, "bar_type", 10); + verify(indexRequestBuilder).setTTL(10); + verify(indexRequestBuilder).setSource( + serializer.getContentBuilder(event).bytes()); + } + + @Test + public void shouldExecuteBulkRequestBuilder() throws Exception { + ListenableActionFuture action = + (ListenableActionFuture) mock(ListenableActionFuture.class); + BulkResponse response = mock(BulkResponse.class); + when(bulkRequestBuilder.execute()).thenReturn(action); + when(action.actionGet()).thenReturn(response); + when(response.hasFailures()).thenReturn(false); + + fixture.addEvent(event, nameBuilder, "bar_type", 10); + fixture.execute(); + verify(bulkRequestBuilder).execute(); + } + + @Test(expected = EventDeliveryException.class) + public void shouldThrowExceptionOnExecuteFailed() throws Exception { + ListenableActionFuture action = + (ListenableActionFuture) mock(ListenableActionFuture.class); + BulkResponse response = mock(BulkResponse.class); + when(bulkRequestBuilder.execute()).thenReturn(action); + when(action.actionGet()).thenReturn(response); + when(response.hasFailures()).thenReturn(true); + + fixture.addEvent(event, nameBuilder, "bar_type", 10); + fixture.execute(); + } +} diff --git a/flume-elasticsearch-sink/src/test/resources/log4j2.xml b/flume-elasticsearch-sink/src/test/resources/log4j2.xml new file mode 100644 index 0000000..5676916 --- /dev/null +++ b/flume-elasticsearch-sink/src/test/resources/log4j2.xml @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..553aaa6 --- /dev/null +++ b/pom.xml @@ -0,0 +1,277 @@ + + + + + + org.apache.flume + flume-parent + 1.10.0-SNAPSHOT + + + 4.0.0 + org.apache.flume + flume-search + 1.0.0-SNAPSHOT + pom + + Apache Flume Search Components + + + + UTF-8 + + + 1.8 + 1.8 + + 0.90.1 + 1.10.0-SNAPSHOT + + + + flume-elasticsearch-sink + + + 2009 + + + JIRA + https://issues.apache.org/jira/browse/FLUME + + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + http://mail-archives.apache.org/mod_mbox/flume-user/ + Flume User List + user@flume.apache.org + user-subscribe@flume.apache.org + user-unsubscribe@flume.apache.org + + + http://mail-archives.apache.org/mod_mbox/flume-dev/ + Flume Developer List + dev@flume.apache.org + dev-subscribe@flume.apache.org + dev-unsubscribe@flume.apache.org + + + http://mail-archives.apache.org/mod_mbox/flume-commits/ + Flume Commits + commits@flume.apache.org + commits-subscribe@flume.apache.org + commits-unsubscribe@flume.apache.org + + + + + https://git-wip-us.apache.org/repos/asf?p=flume.git;a=tree;h=refs/heads/trunk;hb=trunk + scm:git:https://git-wip-us.apache.org/repos/asf/flume.git + scm:git:http://git-wip-us.apache.org/repos/asf/flume.git + + + + jenkins + https://builds.apache.org/job/flume-trunk + + + + + Ralph Goers + rgoers + rgoers@apache.org + Intuit + + + + + Apache Software Foundation + http://www.apache.org + + + + + apache.staging.https + Apache Staging Repository + https://repository.apache.org/service/local/staging/deploy/maven2/ + + + apache.website + ${siteUrlDeployment} + Flume Site + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + ${mvn-checkstyle-plugin.version} + + + + org.apache.flume + build-support + ${flume.version} + + + + com.puppycrawl.tools + checkstyle + ${checkstyle.tool.version} + + + + + verify.checkstyle + verify + + check + + + + + config/checkstyle/checkstyle.xml + config/checkstyle/checkstyle-suppressions.xml + checkstyle.suppressions.file + UTF-8 + true + false + + + + + + + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + true + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + tests + test + + + + + + org.apache.flume + flume-ng-configuration + ${flume.version} + + + org.apache.flume + flume-ng-config-filter-api + ${flume.version} + + + org.apache.flume + flume-ng-auth + ${flume.version} + + + org.apache.flume + flume-ng-core + ${flume.version} + + + + org.apache.flume + build-support + ${flume.version} + + + org.apache.flume + flume-tools + ${flume.version} + + + + org.apache.flume + flume-ng-node + ${flume.version} + + + + org.apache.flume + flume-ng-sdk + ${flume.version} + + + + org.apache.flume + flume-ng-sdk + ${flume.version} + tests + test + + + + + + + + org.apache.maven.plugins + maven-project-info-reports-plugin + + + + ci-management + distribution-management + team + mailing-lists + issue-management + licenses + scm + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.github.spotbugs + spotbugs-maven-plugin + + + org.apache.maven.plugins + maven-pmd-plugin + + + + +