From 1182be14764964c9c277b21339e5f3773f37c2a2 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Tue, 31 Jul 2018 19:24:35 -0700 Subject: [PATCH] [Gobblin-17] Add Elasticsearch writer (rest + transport) --- .../gobblin/writer/AsyncWriterManager.java | 3 + .../java/org/apache/gobblin/writer/Batch.java | 22 +- .../writer/BufferedAsyncDataWriter.java | 4 +- .../gobblin/writer/BytesBoundedBatch.java | 10 +- .../gobblin/writer/LargeMessagePolicy.java | 26 ++ .../writer/RecordTooLargeException.java | 20 ++ .../SequentialBasedBatchAccumulator.java | 65 +++-- .../gobblin-flavor-standard.gradle | 1 + .../src/main/resources/wikipedia-elastic.conf | 64 +++++ .../gobblin-elasticsearch-deps/build.gradle | 50 ++++ .../gobblin-elasticsearch/build.gradle | 76 ++++++ .../scripts/install_test_deps.sh | 40 +++ .../scripts/uninstall_test_deps.sh | 23 ++ .../AvroGenericRecordSerializer.java | 80 ++++++ .../AvroGenericRecordTypeMapper.java | 71 +++++ .../typemapping/FieldMappingException.java | 35 +++ .../typemapping/GsonJsonSerializer.java | 52 ++++ .../typemapping/JsonSerializer.java | 30 +++ .../typemapping/JsonTypeMapper.java | 56 ++++ .../typemapping/SerializationException.java | 31 +++ .../elasticsearch/typemapping/TypeMapper.java | 36 +++ .../ElasticsearchDataWriterBuilder.java | 83 ++++++ .../writer/ElasticsearchRestWriter.java | 232 +++++++++++++++++ .../ElasticsearchTransportClientWriter.java | 118 +++++++++ .../writer/ElasticsearchWriterBase.java | 168 ++++++++++++ .../ElasticsearchWriterConfigurationKeys.java | 71 +++++ .../elasticsearch/writer/ExceptionLogger.java | 26 ++ .../writer/FutureCallbackHolder.java | 193 ++++++++++++++ .../writer/MalformedDocPolicy.java | 26 ++ .../ElasticsearchTestServer.java | 217 ++++++++++++++++ .../ElasticsearchTestServerTest.java | 50 ++++ .../elasticsearch/writer/ConfigBuilder.java | 72 ++++++ ...lasticsearchTransportClientWriterTest.java | 54 ++++ .../writer/ElasticsearchWriterBaseTest.java | 113 ++++++++ .../ElasticsearchWriterIntegrationTest.java | 243 ++++++++++++++++++ .../writer/RestWriterVariant.java | 97 +++++++ .../elasticsearch/writer/TestClient.java | 37 +++ .../writer/TransportWriterVariant.java | 96 +++++++ .../elasticsearch/writer/WriterVariant.java | 40 +++ .../gobblin/test/AvroRecordGenerator.java | 104 ++++++++ .../gobblin/test/JsonRecordGenerator.java | 75 ++++++ .../org/apache/gobblin/test/PayloadType.java | 27 ++ .../gobblin/test/RecordTypeGenerator.java | 43 ++++ .../eventhub/writer/EventhubBatchTest.java | 35 +-- .../org/apache/gobblin/test/TestUtils.java | 21 ++ gradle/scripts/globalDependencies.gradle | 30 ++- 46 files changed, 3005 insertions(+), 61 deletions(-) create mode 100644 gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java create mode 100644 gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java create mode 100644 gobblin-example/src/main/resources/wikipedia-elastic.conf create mode 100644 gobblin-modules/gobblin-elasticsearch-deps/build.gradle create mode 100644 gobblin-modules/gobblin-elasticsearch/build.gradle create mode 100755 gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh create mode 100755 gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java create mode 100644 gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java index 2be89c66a2b..a59975382e3 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java @@ -340,6 +340,8 @@ public void onFailure(Throwable throwable) { .update(currTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS); } if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) { // attempts must == numRetries + 1 + log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", attempt.attemptNum, throwable.getMessage(), + attempt.getRecord().toString()); attempt.incAttempt(); attempt.setPrevAttemptFailure(throwable); AsyncWriterManager.this.retryQueue.get().add(attempt); @@ -391,6 +393,7 @@ public void run() { Attempt attempt = this.retryQueue.take(); if (attempt != null) { maybeSleep(attempt.getPrevAttemptTimestampNanos()); + log.debug("Retry thread will retry record: {}", attempt.getRecord().toString()); attemptWrite(attempt); } } catch (InterruptedException e) { diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java index ff16590abd8..faf815c26a4 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/Batch.java @@ -127,17 +127,18 @@ public void onFailure (Throwable throwable) { /** * A method to check if the batch has the room to add a new record * @param record: record needs to be added + * @param largeMessagePolicy: the policy that is in effect for large messages * @return Indicates if this batch still have enough space to hold a new record */ - public abstract boolean hasRoom (D record); + public abstract boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy); /** * Add a record to this batch *

* Implementation of this method should always ensure the record can be added successfully - * The contract between {@link Batch#tryAppend(Object, WriteCallback)} and this method is this method + * The contract between {@link Batch#tryAppend(Object, WriteCallback, LargeMessagePolicy)} and this method is this method * is responsible for adding record to internal batch memory and the check for the room space is performed - * by {@link Batch#hasRoom(Object)}. All the potential issues for adding a record should + * by {@link Batch#hasRoom(Object, LargeMessagePolicy)}. All the potential issues for adding a record should * already be resolved before this method is invoked. *

* @@ -162,14 +163,19 @@ public void onFailure (Throwable throwable) { * * @param record : record needs to be added * @param callback : A callback which will be invoked when the whole batch gets sent and acknowledged + * @param largeMessagePolicy : the {@link LargeMessagePolicy} that is in effect for this batch * @return A future object which contains {@link RecordMetadata} */ - public Future tryAppend(D record, WriteCallback callback) { - if (!hasRoom(record)) { - LOG.debug ("Cannot add " + record + " to previous batch because the batch already has " + getCurrentSizeInByte() + " bytes"); + public Future tryAppend(D record, WriteCallback callback, LargeMessagePolicy largeMessagePolicy) + throws RecordTooLargeException { + if (!hasRoom(record, largeMessagePolicy)) { + LOG.debug ("Cannot add {} to previous batch because the batch already has {} bytes", + record.toString(), getCurrentSizeInByte()); + if (largeMessagePolicy == LargeMessagePolicy.FAIL) { + throw new RecordTooLargeException(); + } return null; } - this.append(record); thunks.add(new Thunk(callback, getRecordSizeInByte(record))); RecordFuture future = new RecordFuture(latch, recordCount); @@ -178,7 +184,9 @@ public Future tryAppend(D record, WriteCallback callback) { } public void await() throws InterruptedException{ + LOG.debug("Batch {} waiting for {} records", this.id, this.recordCount); this.latch.await(); + LOG.debug("Batch {} done with {} records", this.id, this.recordCount); } } diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java index ceaffecd583..87039b6c1d8 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BufferedAsyncDataWriter.java @@ -39,7 +39,7 @@ * @param data record type */ @Alpha -public abstract class BufferedAsyncDataWriter implements AsyncDataWriter { +public class BufferedAsyncDataWriter implements AsyncDataWriter { private RecordProcessor processor; private BatchAccumulator accumulator; @@ -136,7 +136,7 @@ private WriteCallback createBatchCallback (final Batch batch) { return new WriteCallback() { @Override public void onSuccess(WriteResponse writeResponse) { - LOG.info ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size()); + LOG.debug ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size()); batch.onSuccess(writeResponse); batch.done(); accumulator.deallocate(batch); diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java index ef638824ba5..7b6b4dc1cc5 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/BytesBoundedBatch.java @@ -62,7 +62,11 @@ void append (D record) { records.add(record); } - boolean hasRoom (D record) { + boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) { + if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) { + // there is always space for one record, no matter how big :) + return true; + } long recordLen = BytesBoundedBatch.this.getInternalSize(record); return (byteSize + recordLen) <= BytesBoundedBatch.this.memSizeLimit; } @@ -80,8 +84,8 @@ public List getRecords() { return memory.getRecords(); } - public boolean hasRoom (D object) { - return memory.hasRoom(object); + public boolean hasRoom (D object, LargeMessagePolicy largeMessagePolicy) { + return memory.hasRoom(object, largeMessagePolicy); } public void append (D object) { diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java new file mode 100644 index 00000000000..28ca94940cf --- /dev/null +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/LargeMessagePolicy.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +/** + * Describes how single messages that are larger than a batch message limit should be treated + */ +public enum LargeMessagePolicy { + DROP, // drop (and log) messages that exceed the threshold + ATTEMPT, // attempt to deliver messages that exceed the threshold + FAIL // throw an error when this happens +} diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java new file mode 100644 index 00000000000..845e6a84534 --- /dev/null +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/RecordTooLargeException.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.writer; + +public class RecordTooLargeException extends Exception { +} diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java index 9b7a608c6ce..58b09422c19 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/SequentialBasedBatchAccumulator.java @@ -43,14 +43,16 @@ * keeps in the deque until a TTL is expired. */ -public abstract class SequentialBasedBatchAccumulator extends BatchAccumulator { +public class SequentialBasedBatchAccumulator extends BatchAccumulator { + private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL; private Deque> dq = new LinkedList<>(); private IncompleteRecordBatches incomplete = new IncompleteRecordBatches(); private final long batchSizeLimit; private final long memSizeLimit; private final double tolerance = 0.95; private final long expireInMilliSecond; + private final LargeMessagePolicy largeMessagePolicy; private static final Logger LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class); private final ReentrantLock dqLock = new ReentrantLock(); @@ -63,24 +65,31 @@ public SequentialBasedBatchAccumulator() { } public SequentialBasedBatchAccumulator(Properties properties) { - Config config = ConfigUtils.propertiesToConfig(properties); - this.batchSizeLimit = ConfigUtils.getLong(config, Batch.BATCH_SIZE, - Batch.BATCH_SIZE_DEFAULT); - - this.expireInMilliSecond = ConfigUtils.getLong(config, Batch.BATCH_TTL, - Batch.BATCH_TTL_DEFAULT); - - this.capacity = ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY, - Batch.BATCH_QUEUE_CAPACITY_DEFAULT); + this(ConfigUtils.propertiesToConfig(properties)); + } - this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit); + public SequentialBasedBatchAccumulator(Config config) { + this(ConfigUtils.getLong(config, Batch.BATCH_SIZE, + Batch.BATCH_SIZE_DEFAULT), + ConfigUtils.getLong(config, Batch.BATCH_TTL, + Batch.BATCH_TTL_DEFAULT), + ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY, + Batch.BATCH_QUEUE_CAPACITY_DEFAULT)); } public SequentialBasedBatchAccumulator(long batchSizeLimit, long expireInMilliSecond, long capacity) { + this(batchSizeLimit, expireInMilliSecond, capacity, DEFAULT_LARGE_MESSAGE_POLICY); + } + + public SequentialBasedBatchAccumulator(long batchSizeLimit, + long expireInMilliSecond, + long capacity, + LargeMessagePolicy largeMessagePolicy) { this.batchSizeLimit = batchSizeLimit; this.expireInMilliSecond = expireInMilliSecond; this.capacity = capacity; this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit); + this.largeMessagePolicy = largeMessagePolicy; } public long getNumOfBatches () { @@ -101,7 +110,12 @@ public final Future enqueue (D record, WriteCallback callback) t try { BytesBoundedBatch last = dq.peekLast(); if (last != null) { - Future future = last.tryAppend(record, callback); + Future future = null; + try { + future = last.tryAppend(record, callback, this.largeMessagePolicy); + } catch (RecordTooLargeException e) { + // Ok if the record was too large for the current batch + } if (future != null) { return future; } @@ -110,12 +124,18 @@ public final Future enqueue (D record, WriteCallback callback) t // Create a new batch because previous one has no space BytesBoundedBatch batch = new BytesBoundedBatch(this.memSizeLimit, this.expireInMilliSecond); LOG.debug("Batch " + batch.getId() + " is generated"); - Future future = batch.tryAppend(record, callback); + Future future = null; + try { + future = batch.tryAppend(record, callback, this.largeMessagePolicy); + } catch (RecordTooLargeException e) { + // If a new batch also wasn't able to accomodate the new message + throw new RuntimeException("Failed due to a message that was too large", e); + } - // Even single record can exceed the batch size limit - // Ignore the record because Eventhub can only accept payload less than 256KB + // The future might be null, since the largeMessagePolicy might be set to DROP if (future == null) { - LOG.error("Batch " + batch.getId() + " is marked as complete because it contains a huge record: " + assert largeMessagePolicy.equals(LargeMessagePolicy.DROP); + LOG.error("Batch " + batch.getId() + " is silently marked as complete, dropping a huge record: " + record); future = Futures.immediateFuture(new RecordMetadata(0)); callback.onSuccess(WriteResponse.EMPTY); @@ -124,6 +144,7 @@ public final Future enqueue (D record, WriteCallback callback) t // if queue is full, we should not add more while (dq.size() >= this.capacity) { + LOG.debug("Accumulator size {} is greater than capacity {}, waiting", dq.size(), this.capacity); this.notFull.await(); } dq.addLast(batch); @@ -187,7 +208,7 @@ public Batch getNextAvailableBatch () { return dq.poll(); } else { while (dq.size() == 0) { - LOG.info ("ready to sleep because of queue is empty"); + LOG.debug ("ready to sleep because of queue is empty"); SequentialBasedBatchAccumulator.this.notEmpty.await(); if (SequentialBasedBatchAccumulator.this.isClosed()) { return dq.poll(); @@ -203,7 +224,7 @@ public Batch getNextAvailableBatch () { if (dq.size() == 1) { if (dq.peekFirst().isTTLExpire()) { - LOG.info ("Batch " + dq.peekFirst().getId() + " is expired"); + LOG.debug ("Batch " + dq.peekFirst().getId() + " is expired"); BytesBoundedBatch candidate = dq.poll(); SequentialBasedBatchAccumulator.this.notFull.signal(); return candidate; @@ -240,12 +261,16 @@ public void close() { public void flush() { try { ArrayList batches = this.incomplete.all(); - LOG.info ("flush on {} batches", batches.size()); + int numOutstandingRecords = 0; + for (Batch batch: batches) { + numOutstandingRecords += batch.getRecords().size(); + } + LOG.debug ("Flush called on {} batches with {} records total", batches.size(), numOutstandingRecords); for (Batch batch: batches) { batch.await(); } } catch (Exception e) { - LOG.info ("Error happens when flushing"); + LOG.error ("Error happened while flushing batches"); } } diff --git a/gobblin-distribution/gobblin-flavor-standard.gradle b/gobblin-distribution/gobblin-flavor-standard.gradle index c2061a50218..2f544cadb6c 100644 --- a/gobblin-distribution/gobblin-flavor-standard.gradle +++ b/gobblin-distribution/gobblin-flavor-standard.gradle @@ -21,4 +21,5 @@ dependencies { compile project(':gobblin-modules:gobblin-crypto-provider') compile project(':gobblin-modules:gobblin-kafka-08') compile project(':gobblin-modules:google-ingestion') + compile project(':gobblin-modules:gobblin-elasticsearch') } diff --git a/gobblin-example/src/main/resources/wikipedia-elastic.conf b/gobblin-example/src/main/resources/wikipedia-elastic.conf new file mode 100644 index 00000000000..9db386e31c7 --- /dev/null +++ b/gobblin-example/src/main/resources/wikipedia-elastic.conf @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A sample pull file that copies an input Kafka topic and produces to an output Kafka topic +# with sampling +job { + name=PullFromWikipediaToElasticSearch + group=Wikipedia + description=Pull from Wikipedia and write to ElasticSearch +} + +task.maxretries=0 + +source { + class=org.apache.gobblin.example.wikipedia.WikipediaSource + page.titles="Wikipedia:Sandbox" + revisions.cnt=5 +} + +wikipedia { + api.rooturl="https://en.wikipedia.org/w/api.php" + avro.schema="{\"namespace\": \"example.wikipedia.avro\",\"type\": \"record\",\"name\": \"WikipediaArticle\",\"fields\": [{\"name\": \"revid\", \"type\": [\"double\", \"null\"]},{\"name\": \"pageid\", \"type\": [\"double\", \"null\"]},{\"name\": \"title\", \"type\": [\"string\", \"null\"]},{\"name\": \"user\", \"type\": [\"string\", \"null\"]},{\"name\": \"anon\", \"type\": [\"string\", \"null\"]},{\"name\": \"userid\", \"type\": [\"double\", \"null\"]},{\"name\": \"timestamp\", \"type\": [\"string\", \"null\"]},{\"name\": \"size\", \"type\": [\"double\", \"null\"]},{\"name\": \"contentformat\", \"type\": [\"string\", \"null\"]},{\"name\": \"contentmodel\", \"type\": [\"string\", \"null\"]},{\"name\": \"content\", \"type\": [\"string\", \"null\"]}]}" +} +converter.classes=org.apache.gobblin.example.wikipedia.WikipediaConverter +extract.namespace=org.apache.gobblin.example.wikipedia + +writer { + builder.class=org.apache.gobblin.elasticsearch.writer.ElasticsearchDataWriterBuilder + elasticsearch { + client.type=REST + index.name=wikipedia-test + index.type=docs + #hosts=hostname + #ssl { + # enabled=true + # keystoreType=pkcs12 + # keystorePassword=change_me + # keystoreLocation=/path/to/.p12 file + # truststoreType=jks + # truststoreLocation=/path/to/cacerts + # truststorePassword=changeme + #} + typeMapperClass=org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper + useIdFromData=false # change to true if you want to use a field from the record as the id field + #idFieldName=id # change to the field of the record that you want to use as the id of the document + } +} + +data.publisher.type=org.apache.gobblin.publisher.NoopPublisher + diff --git a/gobblin-modules/gobblin-elasticsearch-deps/build.gradle b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle new file mode 100644 index 00000000000..35e9a3630aa --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch-deps/build.gradle @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +buildscript { + repositories { + jcenter() + } + dependencies { + classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4' + } +} + +apply plugin: 'com.github.johnrengelman.shadow' +apply plugin: 'java' + +dependencies { + compile "org.elasticsearch.client:transport:5.6.8" + compile "org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.8" + compile "com.google.guava:guava:18.0" +} + + +configurations { + compile { + exclude group: "org.apache.hadoop" + exclude group: "com.sun.jersey.contribs" + } +} + +shadowJar { + zip64 true + relocate 'com.google.common', 'shadow.gobblin.elasticsearch.com.google.common' +} + +ext.classification="library" diff --git a/gobblin-modules/gobblin-elasticsearch/build.gradle b/gobblin-modules/gobblin-elasticsearch/build.gradle new file mode 100644 index 00000000000..2d624b22a8a --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/build.gradle @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'java' + +dependencies { + compile project(":gobblin-api") + compile project(":gobblin-core-base") + compile project(":gobblin-utility") + compile project(":gobblin-metrics-libs:gobblin-metrics") + compile project(path: ":gobblin-modules:gobblin-elasticsearch-deps", configuration:"shadow") + + compile "org.apache.logging.log4j:log4j-to-slf4j:2.7" + compile "org.slf4j:slf4j-api:1.7.21" + compile externalDependency.avro + compile externalDependency.jacksonCore + compile externalDependency.jacksonMapper + compile externalDependency.commonsHttpClient + compile externalDependency.commonsPool + compile externalDependency.commonsLang3 + compile externalDependency.slf4j + compile externalDependency.httpclient + compile externalDependency.httpcore + compile externalDependency.lombok + compile externalDependency.metricsCore + compile externalDependency.typesafeConfig + compile externalDependency.findBugsAnnotations + + testCompile project(":gobblin-runtime") + testCompile project(":gobblin-test-utils") + testCompile externalDependency.jsonAssert + testCompile externalDependency.mockito + testCompile externalDependency.testng +} + +task installTestDependencies(type:Exec) { + workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/" + commandLine './scripts/install_test_deps.sh' +} + +task uninstallTestDependencies(type: Exec) { + workingDir "${project.rootDir}/gobblin-modules/gobblin-elasticsearch/" + commandLine './scripts/uninstall_test_deps.sh' + +} + +test.dependsOn installTestDependencies +test.finalizedBy uninstallTestDependencies + +configurations { + compile { + transitive = false + } +} + +test { + workingDir rootProject.rootDir + maxParallelForks = 4 +} + + +ext.classification="library" diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh new file mode 100755 index 00000000000..48324da30f1 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/scripts/install_test_deps.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +TARGET_DIR="test-elasticsearch" +ES_VERSION=5.6.8 +ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION} +echo ${TARGET_DIR} +mkdir -p ${TARGET_DIR} + + +ES_TAR=${TARGET_DIR}/elasticsearch-${ES_VERSION}.tar.gz +ES_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz +echo ${ES_URL} +echo ${ES_TAR} +if [ -d $ES_DIR ]; +then + echo "Skipping download since version already found at ${ES_DIR}" + echo "Cleaning up directory" + rm -rf ${TARGET_DIR}/elasticsearch-${ES_VERSION} +else + echo "$ES_DIR does not exist, downloading" + curl -o ${ES_TAR} ${ES_URL} +fi +tar -xzf ${ES_TAR} -C ${TARGET_DIR} diff --git a/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh new file mode 100755 index 00000000000..db79f8664c4 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/scripts/uninstall_test_deps.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +TARGET_DIR="test-elasticsearch" +ES_VERSION=5.6.8 +ES_DIR=${TARGET_DIR}/elasticsearch-${ES_VERSION} +rm -rf ${TARGET_DIR} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java new file mode 100644 index 00000000000..52422021399 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordSerializer.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.output.ByteArrayOutputStream; + +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A {@link JsonSerializer} for {@link GenericRecord} objects. + */ +@Slf4j +public class AvroGenericRecordSerializer implements JsonSerializer { + + private final ByteArrayOutputStream byteArrayOutputStream; + private final DataOutputStream out; + private final GenericDatumWriter writer; + private final Closer closer; + + + public AvroGenericRecordSerializer() { + this.closer =Closer.create(); + this.byteArrayOutputStream = new ByteArrayOutputStream(); + this.out = this.closer.register(new DataOutputStream(this.byteArrayOutputStream)); + this.writer = new GenericDatumWriter(); + } + + @Override + public void configure(Config config) { + + } + + @Override + public synchronized byte[] serializeToJson(GenericRecord serializable) + throws SerializationException { + try { + /** + * We use the toString method of Avro to flatten the JSON for optional nullable types. + * Otherwise the JSON has an additional level of nesting to encode the type. + * e.g. "id": {"string": "id-value"} versus "id": "id-value" + * See {@link: https://issues.apache.org/jira/browse/AVRO-1582} for a good discussion on this. + */ + String serialized = serializable.toString(); + return serialized.getBytes(Charset.forName("UTF-8")); + + } catch (Exception exception) { + throw new SerializationException("Could not serializeToJson Avro record", exception); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java new file mode 100644 index 00000000000..0586f3c73ea --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/AvroGenericRecordTypeMapper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.IOException; + +import org.apache.avro.generic.GenericRecord; + +import com.google.common.io.Closer; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A TypeMapper for Avro GenericRecords. + */ +@Slf4j +public class AvroGenericRecordTypeMapper implements TypeMapper { + + private final JsonSerializer serializer; + private final Closer closer; + + public AvroGenericRecordTypeMapper() { + this.closer =Closer.create(); + this.serializer = this.closer.register(new AvroGenericRecordSerializer()); + } + + @Override + public void configure(Config config) { + this.serializer.configure(config); + log.info("AvroGenericRecordTypeMapper successfully configured"); + } + + @Override + public JsonSerializer getSerializer() { + return this.serializer; + } + + @Override + public String getValue(String fieldName, GenericRecord record) + throws FieldMappingException { + try { + Object idValue = record.get(fieldName); + return idValue.toString(); + } + catch (Exception e) { + throw new FieldMappingException("Could not find field " + fieldName, e); + } + } + + @Override + public void close() + throws IOException { + this.closer.close(); + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java new file mode 100644 index 00000000000..781f918cc20 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/FieldMappingException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +/** + * An exception for type mapping errors during field-based access + */ +public class FieldMappingException extends Exception { + + public FieldMappingException(Exception e) { + super(e); + } + + public FieldMappingException(String message, Exception e) { + super(message, e); + } + + public FieldMappingException(String message) { + super(message); + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java new file mode 100644 index 00000000000..d44986c1b84 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/GsonJsonSerializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import com.google.gson.Gson; +import com.typesafe.config.Config; + + +/** + * A Gson based Json Serializer + */ +public class GsonJsonSerializer implements JsonSerializer { + private final Gson _gson = new Gson(); + + @Override + public void configure(Config config) { + + } + + @Override + public byte[] serializeToJson(Object serializable) + throws SerializationException { + String jsonString = _gson.toJson(serializable); + try { + return jsonString.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new SerializationException(e); + } + } + + @Override + public void close() + throws IOException { + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java new file mode 100644 index 00000000000..41f28851b9c --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonSerializer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.Closeable; + +import com.typesafe.config.Config; + + +public interface JsonSerializer extends Closeable { + + void configure(Config config); + + byte[] serializeToJson(T serializable) throws SerializationException; + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java new file mode 100644 index 00000000000..8491147d322 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/JsonTypeMapper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.IOException; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.typesafe.config.Config; + + +public class JsonTypeMapper implements TypeMapper { + + private final JsonSerializer serializer = new GsonJsonSerializer(); + @Override + public void configure(Config config) { + + } + + @Override + public JsonSerializer getSerializer() { + return serializer; + } + + @Override + public String getValue(String fieldName, JsonElement record) + throws FieldMappingException { + assert record.isJsonObject(); + JsonObject jsonObject = record.getAsJsonObject(); + if (jsonObject.has(fieldName)) { + return jsonObject.get(fieldName).getAsString(); + } else { + throw new FieldMappingException("Could not find field :" + fieldName); + } + } + + @Override + public void close() + throws IOException { + + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java new file mode 100644 index 00000000000..d2edb53a8b8 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/SerializationException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +/** + * A class to hold exceptions thrown by {@link JsonSerializer}s. + */ +public class SerializationException extends Exception { + public SerializationException(Exception e) { + super(e); + } + + public SerializationException(String s, Exception exception) { + super(s, exception); + } +} + diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java new file mode 100644 index 00000000000..5aa909b76df --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/typemapping/TypeMapper.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.typemapping; + +import java.io.Closeable; + +import com.typesafe.config.Config; + + +/** + * An interface that enables the ElasticSearch writer to work with different types of records. + * Supports serialization and id-getter capabilities + */ +public interface TypeMapper extends Closeable { + + void configure(Config config); + + JsonSerializer getSerializer(); + + String getValue(String fieldName, T record) throws FieldMappingException; + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java new file mode 100644 index 00000000000..cb6ed150d69 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchDataWriterBuilder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.AsyncWriterManager; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.BufferedAsyncDataWriter; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.DataWriterBuilder; +import org.apache.gobblin.writer.SequentialBasedBatchAccumulator; + +import com.google.gson.JsonObject; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.State; + +public class ElasticsearchDataWriterBuilder extends DataWriterBuilder { + + @Override + public DataWriter build() throws IOException { + + State state = this.destination.getProperties(); + Properties taskProps = state.getProperties(); + Config config = ConfigUtils.propertiesToConfig(taskProps); + + SequentialBasedBatchAccumulator batchAccumulator = new SequentialBasedBatchAccumulator<>(taskProps); + + BatchAsyncDataWriter asyncDataWriter; + switch (ElasticsearchWriterConfigurationKeys.ClientType.valueOf( + ConfigUtils.getString(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT).toUpperCase())) { + case REST: { + asyncDataWriter = new ElasticsearchRestWriter(config); + break; + } + case TRANSPORT: { + asyncDataWriter = new ElasticsearchTransportClientWriter(config); + break; + } + default: { + throw new IllegalArgumentException("Need to specify which " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE + + " client to use (rest/transport)"); + } + } + BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, asyncDataWriter); + + double failureAllowance = ConfigUtils.getDouble(config, ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG, + ElasticsearchWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0; + boolean retriesEnabled = ConfigUtils.getBoolean(config, ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED, + ElasticsearchWriterConfigurationKeys.RETRIES_ENABLED_DEFAULT); + int maxRetries = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.MAX_RETRIES, + ElasticsearchWriterConfigurationKeys.MAX_RETRIES_DEFAULT); + + + return AsyncWriterManager.builder() + .failureAllowanceRatio(failureAllowance) + .retriesEnabled(retriesEnabled) + .numRetries(maxRetries) + .config(config) + .asyncDataWriter(bufferedAsyncDataWriter) + .build(); + } +} \ No newline at end of file diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java new file mode 100644 index 00000000000..7cd77daf596 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.elasticsearch.writer; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.password.PasswordManager; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.Batch; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.GenericWriteResponse; +import org.apache.gobblin.writer.WriteCallback; +import org.apache.gobblin.writer.WriteResponse; +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.SSLContexts; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentType; + +import com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; + +import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchRestWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter { + + private final RestHighLevelClient client; + private final RestClient lowLevelClient; + + ElasticsearchRestWriter(Config config) + throws IOException { + super(config); + + + int threadCount = ConfigUtils.getInt(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT); + try { + + PasswordManager passwordManager = PasswordManager.getInstance(); + Boolean sslEnabled = ConfigUtils.getBoolean(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT); + if (sslEnabled) { + + // keystore + String keyStoreType = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT); + String keyStoreFilePassword = passwordManager.readPassword(ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD, "")); + String identityFilepath = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION, ""); + + // truststore + String trustStoreType = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT); + String trustStoreFilePassword = passwordManager.readPassword(ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD, "")); + String cacertsFilepath = ConfigUtils + .getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION, ""); + String truststoreAbsolutePath = Paths.get(cacertsFilepath).toAbsolutePath().normalize().toString(); + log.info("Truststore absolutePath is:" + truststoreAbsolutePath); + + + this.lowLevelClient = + buildRestClient(this.hostAddresses, threadCount, true, keyStoreType, keyStoreFilePassword, identityFilepath, + trustStoreType, trustStoreFilePassword, cacertsFilepath); + } + else { + this.lowLevelClient = buildRestClient(this.hostAddresses, threadCount); + } + client = new RestHighLevelClient(this.lowLevelClient); + + log.info("Elasticsearch Rest Writer configured successfully with: indexName={}, " + + "indexType={}, idMappingEnabled={}, typeMapperClassName={}, ssl={}", + this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper.getClass().getCanonicalName(), + sslEnabled); + + } catch (Exception e) { + throw new IOException("Failed to instantiate rest elasticsearch client", e); + } + } + + @Override + int getDefaultPort() { + return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_REST_WRITER_DEFAULT_PORT; + } + + + private static RestClient buildRestClient(List hosts, int threadCount) + throws Exception { + return buildRestClient(hosts, threadCount, false, null, null, null, null, null, null); + } + + + //TODO: Support pass through of configuration (e.g. timeouts etc) of rest client from above + private static RestClient buildRestClient(List hosts, int threadCount, boolean sslEnabled, + String keyStoreType, String keyStoreFilePassword, String identityFilepath, String trustStoreType, + String trustStoreFilePassword, String cacertsFilepath) throws Exception { + + + HttpHost[] httpHosts = new HttpHost[hosts.size()]; + String scheme = sslEnabled?"https":"http"; + for (int h = 0; h < httpHosts.length; h++) { + InetSocketTransportAddress host = hosts.get(h); + httpHosts[h] = new HttpHost(host.getAddress(), host.getPort(), scheme); + } + + RestClientBuilder builder = RestClient.builder(httpHosts); + + if (sslEnabled) { + log.info("ssl configuration: trustStoreType = {}, cacertsFilePath = {}", trustStoreType, cacertsFilepath); + KeyStore truststore = KeyStore.getInstance(trustStoreType); + FileInputStream trustInputStream = new FileInputStream(cacertsFilepath); + try { + truststore.load(trustInputStream, trustStoreFilePassword.toCharArray()); + } + finally { + trustInputStream.close(); + } + SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(truststore, null); + + log.info("ssl key configuration: keyStoreType = {}, keyFilePath = {}", keyStoreType, identityFilepath); + + KeyStore keystore = KeyStore.getInstance(keyStoreType); + FileInputStream keyInputStream = new FileInputStream(identityFilepath); + try { + keystore.load(keyInputStream, keyStoreFilePassword.toCharArray()); + } + finally { + keyInputStream.close(); + } + sslBuilder.loadKeyMaterial(keystore, keyStoreFilePassword.toCharArray()); + + final SSLContext sslContext = sslBuilder.build(); + builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder + // Set ssl context + .setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier()) + // Configure number of threads for clients + .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build())); + } else { + builder = builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder + // Configure number of threads for clients + .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(threadCount).build())); + } + + // Configure timeouts + builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder + .setConnectionRequestTimeout(0)); // Important, otherwise the client has spurious timeouts + + return builder.build(); + } + + @Override + public Future write(final Batch batch, @Nullable WriteCallback callback) { + + Pair preparedBatch = this.prepareBatch(batch, callback); + try { + client.bulkAsync(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener()); + return preparedBatch.getSecond().getFuture(); + } + catch (Exception e) { + throw new RuntimeException("Caught unexpected exception while calling bulkAsync API", e); + } + } + + + + @Override + public void flush() throws IOException { + + } + + @Override + public void close() throws IOException { + super.close(); + this.lowLevelClient.close(); + } + + @VisibleForTesting + public RestHighLevelClient getRestHighLevelClient() { + return this.client; + } + + @VisibleForTesting + public RestClient getRestLowLevelClient() { + return this.lowLevelClient; + } + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java new file mode 100644 index 00000000000..bb26fb5b188 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.Batch; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.GenericWriteResponseWrapper; +import org.apache.gobblin.writer.WriteCallback; +import org.apache.gobblin.writer.WriteResponse; +import org.apache.gobblin.writer.WriteResponseFuture; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.transport.client.PreBuiltTransportClient; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +class ElasticsearchTransportClientWriter extends ElasticsearchWriterBase implements BatchAsyncDataWriter { + + private final TransportClient client; + + ElasticsearchTransportClientWriter(Config config) throws UnknownHostException { + super(config); + // Check if ssl is being configured, throw error that transport client does not support ssl + Preconditions.checkArgument(!ConfigUtils.getBoolean(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, false), + "Transport client does not support ssl, try the Rest client instead"); + + this.client = createTransportClient(config); + + log.info("ElasticsearchWriter configured successfully with: indexName={}, indexType={}, idMappingEnabled={}, typeMapperClassName={}", + this.indexName, this.indexType, this.idMappingEnabled, this.typeMapper); + } + + @Override + int getDefaultPort() { + return ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT; + } + + @Override + public Future write(Batch batch, @Nullable WriteCallback callback) { + + Pair preparedBatch = this.prepareBatch(batch, callback); + client.bulk(preparedBatch.getFirst(), preparedBatch.getSecond().getActionListener()); + return preparedBatch.getSecond().getFuture(); + + } + + @Override + public void flush() throws IOException { + // Elasticsearch client doesn't support a flush method + } + + @Override + public void close() throws IOException { + log.info("Got a close call in ElasticSearchTransportWriter"); + super.close(); + this.client.close(); + } + + @VisibleForTesting + TransportClient getTransportClient() { + return this.client; + } + + private TransportClient createTransportClient(Config config) throws UnknownHostException { + TransportClient transportClient; + + // Set TransportClient settings + Settings.Builder settingsBuilder = Settings.builder(); + if (config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS)) { + settingsBuilder.put(ConfigUtils.configToProperties(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SETTINGS)); + } + settingsBuilder.put("client.transport.ignore_cluster_name",true); + settingsBuilder.put("client.transport.sniff", true); + transportClient = new PreBuiltTransportClient(settingsBuilder.build()); + this.hostAddresses.forEach(transportClient::addTransportAddress); + return transportClient; + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java new file mode 100644 index 00000000000..5238b502e3a --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBase.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.elasticsearch.typemapping.JsonSerializer; +import org.apache.gobblin.elasticsearch.typemapping.TypeMapper; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.Batch; +import org.apache.gobblin.writer.WriteCallback; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentType; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.base.Throwables; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +/** + * A base class for different types of Elasticsearch writers + */ +@Slf4j +public abstract class ElasticsearchWriterBase implements Closeable { + protected final String indexName; + protected final String indexType; + protected final TypeMapper typeMapper; + protected final JsonSerializer serializer; + protected final boolean idMappingEnabled; + protected final String idFieldName; + List hostAddresses; + protected final MalformedDocPolicy malformedDocPolicy; + + ElasticsearchWriterBase(Config config) + throws UnknownHostException { + + this.indexName = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME); + Preconditions.checkNotNull(this.indexName, "Index Name not provided. Please set " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME); + Preconditions.checkArgument(this.indexName.equals(this.indexName.toLowerCase()), + "Index name must be lowercase, you provided " + this.indexName); + this.indexType = config.getString(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE); + Preconditions.checkNotNull(this.indexName, "Index Type not provided. Please set " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE); + this.idMappingEnabled = ConfigUtils.getBoolean(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT); + this.idFieldName = ConfigUtils.getString(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT); + String typeMapperClassName = ConfigUtils.getString(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT); + if (typeMapperClassName.isEmpty()) { + throw new IllegalArgumentException(this.getClass().getCanonicalName() + " needs to be configured with " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS + " to enable type mapping"); + } + try { + Class typeMapperClass = (Class) Class.forName(typeMapperClassName); + + this.typeMapper = (TypeMapper) ConstructorUtils.invokeConstructor(typeMapperClass); + this.typeMapper.configure(config); + this.serializer = this.typeMapper.getSerializer(); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { + log.error("Failed to instantiate type-mapper from class " + typeMapperClassName, e); + throw Throwables.propagate(e); + } + + this.malformedDocPolicy = MalformedDocPolicy.valueOf(ConfigUtils.getString(config, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY, + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT)); + + // If list is empty, connect to the default host and port + if (!config.hasPath(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS)) { + InetSocketTransportAddress hostAddress = new InetSocketTransportAddress( + InetAddress.getByName(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_DEFAULT_HOST), + getDefaultPort()); + this.hostAddresses = new ArrayList<>(1); + this.hostAddresses.add(hostAddress); + log.info("Adding host {} to Elasticsearch writer", hostAddress); + } else { + // Get list of hosts + List hosts = ConfigUtils.getStringList(config, ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS); + // Add host addresses + Splitter hostSplitter = Splitter.on(":").trimResults(); + this.hostAddresses = new ArrayList<>(hosts.size()); + for (String host : hosts) { + + List hostSplit = hostSplitter.splitToList(host); + Preconditions.checkArgument(hostSplit.size() == 1 || hostSplit.size() == 2, + "Malformed host name for Elasticsearch writer: " + host + " host names must be of form [host] or [host]:[port]"); + + InetAddress hostInetAddress = InetAddress.getByName(hostSplit.get(0)); + InetSocketTransportAddress hostAddress = null; + + if (hostSplit.size() == 1) { + hostAddress = new InetSocketTransportAddress(hostInetAddress, this.getDefaultPort()); + } else if (hostSplit.size() == 2) { + hostAddress = new InetSocketTransportAddress(hostInetAddress, Integer.parseInt(hostSplit.get(1))); + } + this.hostAddresses.add(hostAddress); + log.info("Adding host {} to Elasticsearch writer", hostAddress); + } + } + } + + abstract int getDefaultPort(); + + + protected Pair prepareBatch(Batch batch, WriteCallback callback) { + BulkRequest bulkRequest = new BulkRequest(); + final StringBuilder stringBuilder = new StringBuilder(); + for (Object record : batch.getRecords()) { + try { + byte[] serializedBytes = this.serializer.serializeToJson(record); + log.debug("serialized record: {}", serializedBytes); + IndexRequest indexRequest = new IndexRequest(this.indexName, this.indexType) + .source(serializedBytes, 0, serializedBytes.length, XContentType.JSON); + if (this.idMappingEnabled) { + String id = this.typeMapper.getValue(this.idFieldName, record); + indexRequest.id(id); + stringBuilder.append(";").append(id); + } + bulkRequest.add(indexRequest); + } + catch (Exception e) { + log.error("Encountered exception {}", e); + } + } + FutureCallbackHolder futureCallbackHolder = new FutureCallbackHolder(callback, + exception -> log.error("Batch: {} failed on ids; {} with exception {}", batch.getId(), + stringBuilder.toString(), exception), + this.malformedDocPolicy); + return new Pair(bulkRequest, futureCallbackHolder); + } + + @Override + public void close() throws IOException { + this.serializer.close(); + } + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java new file mode 100644 index 00000000000..0dad29db7c5 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterConfigurationKeys.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper; + + +public class ElasticsearchWriterConfigurationKeys { + + private static final String ELASTICSEARCH_WRITER_PREFIX = "writer.elasticsearch"; + + private static String prefix(String value) { return ELASTICSEARCH_WRITER_PREFIX + "." + value;}; + + public static final String ELASTICSEARCH_WRITER_SETTINGS = prefix("settings"); + public static final String ELASTICSEARCH_WRITER_HOSTS = prefix("hosts"); + public static final String ELASTICSEARCH_WRITER_INDEX_NAME = prefix("index.name"); + public static final String ELASTICSEARCH_WRITER_INDEX_TYPE = prefix("index.type"); + public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS = prefix("typeMapperClass"); + public static final String ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS_DEFAULT = JsonTypeMapper.class.getCanonicalName(); + public static final String ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED = prefix("useIdFromData"); + public static final Boolean ELASTICSEARCH_WRITER_ID_MAPPING_DEFAULT = false; + public static final String ELASTICSEARCH_WRITER_ID_FIELD = prefix("idFieldName"); + public static final String ELASTICSEARCH_WRITER_ID_FIELD_DEFAULT = "id"; + public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE = prefix("client.type"); + public static final String ELASTICSEARCH_WRITER_CLIENT_TYPE_DEFAULT = "REST"; + public static final String ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_SIZE = prefix("client.threadPoolSize"); + public static final int ELASTICSEARCH_WRITER_CLIENT_THREADPOOL_DEFAULT = 5; + public static final String ELASTICSEARCH_WRITER_SSL_ENABLED=prefix("ssl.enabled"); + public static final boolean ELASTICSEARCH_WRITER_SSL_ENABLED_DEFAULT=false; + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE=prefix("ssl.keystoreType"); + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_TYPE_DEFAULT = "pkcs12"; + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_PASSWORD=prefix("ssl.keystorePassword"); + public static final String ELASTICSEARCH_WRITER_SSL_KEYSTORE_LOCATION=prefix("ssl.keystoreLocation"); + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE=prefix("ssl.truststoreType"); + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_TYPE_DEFAULT = "jks"; + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_LOCATION=prefix("ssl.truststoreLocation"); + public static final String ELASTICSEARCH_WRITER_SSL_TRUSTSTORE_PASSWORD=prefix("ssl.truststorePassword"); + public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY = prefix("malformedDocPolicy"); + public static final String ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY_DEFAULT = "FAIL"; + + //Async Writer Configuration + public static final String RETRIES_ENABLED = prefix("retriesEnabled"); + public static final boolean RETRIES_ENABLED_DEFAULT = true; + public static final String MAX_RETRIES = prefix("maxRetries"); + public static final int MAX_RETRIES_DEFAULT = 5; + static final String FAILURE_ALLOWANCE_PCT_CONFIG = prefix("failureAllowancePercentage"); + static final double FAILURE_ALLOWANCE_PCT_DEFAULT = 0.0; + + public enum ClientType { + TRANSPORT, + REST + } + + public static final String ELASTICSEARCH_WRITER_DEFAULT_HOST = "localhost"; + public static final int ELASTICSEARCH_TRANSPORT_WRITER_DEFAULT_PORT = 9300; + public static final int ELASTICSEARCH_REST_WRITER_DEFAULT_PORT = 9200; +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java new file mode 100644 index 00000000000..3a238ad1781 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ExceptionLogger.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +/** + * An interface to log Exceptions + */ +public interface ExceptionLogger { + + void log(Exception exception); + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java new file mode 100644 index 00000000000..f592ffa3fe3 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.writer.GenericWriteResponse; +import org.apache.gobblin.writer.WriteCallback; +import org.apache.gobblin.writer.WriteResponse; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkResponse; + +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * A class to hold Futures and Callbacks to support Async writes + */ +@Slf4j +public class FutureCallbackHolder { + + @Getter + private final ActionListener actionListener; + private final BlockingQueue> writeResponseQueue = new ArrayBlockingQueue<>(1); + @Getter + private final Future future; + private final AtomicBoolean done = new AtomicBoolean(false); + + public FutureCallbackHolder(final @Nullable WriteCallback callback, + ExceptionLogger exceptionLogger, + final MalformedDocPolicy malformedDocPolicy) { + this.future = new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return done.get(); + } + + @Override + public WriteResponse get() + throws InterruptedException, ExecutionException { + Pair writeResponseThrowablePair = writeResponseQueue.take(); + return getWriteResponseorThrow(writeResponseThrowablePair); + } + + @Override + public WriteResponse get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + Pair writeResponseThrowablePair = writeResponseQueue.poll(timeout, unit); + if (writeResponseThrowablePair == null) { + throw new TimeoutException("Timeout exceeded while waiting for future to be done"); + } else { + return getWriteResponseorThrow(writeResponseThrowablePair); + } + } + }; + + this.actionListener = new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + boolean logicalErrors = false; + boolean serverErrors = false; + for (BulkItemResponse bulkItemResponse: bulkItemResponses) { + if (bulkItemResponse.isFailed()) { + // check if the failure is permanent (logical) or transient (server) + if (isLogicalError(bulkItemResponse)) { + // check error policy + switch (malformedDocPolicy) { + case IGNORE: { + log.debug("Document id {} was malformed with error {}", + bulkItemResponse.getId(), + bulkItemResponse.getFailureMessage()); + break; + } + case WARN: { + log.warn("Document id {} was malformed with error {}", + bulkItemResponse.getId(), + bulkItemResponse.getFailureMessage()); + break; + } + default: { + // Pass through + } + } + logicalErrors = true; + } else { + serverErrors = true; + } + } + } + if (serverErrors) { + onFailure(new RuntimeException("Partial failures in the batch: " + bulkItemResponses.buildFailureMessage())); + } else if (logicalErrors) { + // all errors found were logical, throw RuntimeException if policy says to Fail + switch (malformedDocPolicy) { + case FAIL: { + onFailure(new RuntimeException("Partial non-recoverable failures in the batch. To ignore these, set " + + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY + " to " + + MalformedDocPolicy.IGNORE.name())); + break; + } + default: { + WriteResponse writeResponse = new GenericWriteResponse(bulkItemResponses); + writeResponseQueue.add(new Pair(writeResponse, null)); + if (callback != null) { + callback.onSuccess(writeResponse); + } + } + } + } + } else { + WriteResponse writeResponse = new GenericWriteResponse(bulkItemResponses); + writeResponseQueue.add(new Pair(writeResponse, null)); + if (callback != null) { + callback.onSuccess(writeResponse); + } + } + } + + private boolean isLogicalError(BulkItemResponse bulkItemResponse) { + String failureMessage = bulkItemResponse.getFailureMessage(); + return failureMessage.contains("IllegalArgumentException") + || failureMessage.contains("illegal_argument_exception") + || failureMessage.contains("MapperParsingException") + || failureMessage.contains("mapper_parsing_exception"); + } + + @Override + public void onFailure(Exception exception) { + writeResponseQueue.add(new Pair(null, exception)); + if (exceptionLogger != null) { + exceptionLogger.log(exception); + } + if (callback != null) { + callback.onFailure(exception); + } + } + }; + } + + + private WriteResponse getWriteResponseorThrow(Pair writeResponseThrowablePair) + throws ExecutionException { + try { + if (writeResponseThrowablePair.getFirst() != null) { + return writeResponseThrowablePair.getFirst(); + } else if (writeResponseThrowablePair.getSecond() != null) { + throw new ExecutionException(writeResponseThrowablePair.getSecond()); + } else { + throw new ExecutionException(new RuntimeException("Could not find non-null WriteResponse pair")); + } + } finally { + done.set(true); + } + + } + + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java new file mode 100644 index 00000000000..4449d604940 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/MalformedDocPolicy.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +/** + * A class to represent different policies for handling malformed documents + */ +public enum MalformedDocPolicy { + IGNORE, // Ignore on failure + WARN, // Log warning on failure + FAIL // Fail on failure +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java new file mode 100644 index 00000000000..fb11d8dc917 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServer.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.gobblin.test.TestUtils; +import org.testng.Assert; + +import com.google.common.base.Throwables; + +import javax.annotation.concurrent.NotThreadSafe; +import lombok.extern.slf4j.Slf4j; + + +/** + * A Test ElasticSearch server + */ +@Slf4j +@NotThreadSafe +public class ElasticsearchTestServer { + + + private static final String ELASTICSEARCH_VERSION="5.6.8"; + private static final String TEST_ROOT_DIR="gobblin-modules/gobblin-elasticsearch/test-elasticsearch/"; + // The clean elasticsearch instance is installed here + private static final String BASE_ELASTICSEARCH_INSTALL =TEST_ROOT_DIR + "elasticsearch-" + ELASTICSEARCH_VERSION; + // Per-test elasticsearch instances are installed under a different directory + private static final String TEST_INSTALL_PREFIX =TEST_ROOT_DIR + "es-test-install-"; + private static final String ELASTICSEARCH_BIN="/bin/elasticsearch"; + private static final String ELASTICSEARCH_CONFIG_FILE= "/config/elasticsearch.yml"; + private static final String ELASTICSEARCH_JVMOPTS_FILE="/config/jvm.options"; + private final String _testId; + private final int _tcpPort; + private Process elasticProcess; + private final int _httpPort; + private String _pid = ManagementFactory.getRuntimeMXBean().getName(); + private final String _testInstallDirectory; + private AtomicBoolean _started = new AtomicBoolean(false); + + public ElasticsearchTestServer(String testId) + throws IOException { + this(testId, TestUtils.findFreePort(), TestUtils.findFreePort()); + } + + private ElasticsearchTestServer(String testId, int httpPort, int tcpPort) + throws IOException { + _testId = testId; + _httpPort = httpPort; + _tcpPort = tcpPort; + _testInstallDirectory = TEST_INSTALL_PREFIX + _testId; + try { + createInstallation(); + } + catch (Exception e) { + throw new IOException("Failed to create a test installation of elasticsearch", e); + } + configure(); + } + + public ElasticsearchTestServer() + throws IOException { + this(TestUtils.generateRandomAlphaString(25)); + } + + private void createInstallation() + throws IOException { + File srcDir = new File(BASE_ELASTICSEARCH_INSTALL); + if (!srcDir.exists()) { + throw new IOException("Could not find base elasticsearch instance installed at " + srcDir.getAbsolutePath() + "\n" + + "Run ./gradlew :gobblin-modules:gobblin-elasticsearch:installTestDependencies before running this test"); + } + File destDir = new File(_testInstallDirectory); + log.debug("About to recreate directory : {}", destDir.getPath()); + if (destDir.exists()) { + org.apache.commons.io.FileUtils.deleteDirectory(destDir); + } + + String[] commands = {"cp", "-r", srcDir.getAbsolutePath(), destDir.getAbsolutePath()}; + try { + log.debug("{}: Will run command: {}", this._pid, Arrays.toString(commands)); + Process copyProcess = new ProcessBuilder().inheritIO().command(commands).start(); + copyProcess.waitFor(); + } catch (Exception e) { + log.error("Failed to create installation directory at {}", destDir.getPath(), e); + Throwables.propagate(e); + } + } + + + + + private void configure() throws IOException { + File configFile = new File(_testInstallDirectory + ELASTICSEARCH_CONFIG_FILE); + FileOutputStream configFileStream = new FileOutputStream(configFile); + try { + configFileStream.write(("cluster.name: " + _testId + "\n").getBytes("UTF-8")); + configFileStream.write(("http.port: " + _httpPort + "\n").getBytes("UTF-8")); + configFileStream.write(("transport.tcp.port: " + _tcpPort + "\n").getBytes("UTF-8")); + } + finally { + configFileStream.close(); + } + + File jvmConfigFile = new File(_testInstallDirectory + ELASTICSEARCH_JVMOPTS_FILE); + try (Stream lines = Files.lines(jvmConfigFile.toPath())) { + List newLines = lines.map(line -> line.replaceAll("^\\s*(-Xm[s,x]).*$", "$1128m")) + .collect(Collectors.toList()); + Files.write(jvmConfigFile.toPath(), newLines); + } + } + + public void start(int maxStartupTimeSeconds) + { + if (_started.get()) { + log.warn("ElasticSearch server has already been attempted to be started... returning without doing anything"); + return; + } + _started.set(true); + + log.error("{}: Starting elasticsearch server on port {}", this._pid, this._httpPort); + String[] commands = {_testInstallDirectory + ELASTICSEARCH_BIN}; + + try { + log.error("{}: Will run command: {}", this._pid, Arrays.toString(commands)); + elasticProcess = new ProcessBuilder().inheritIO().command(commands).start(); + if (elasticProcess != null) { + // register destroy of process on shutdown in-case of unclean test termination + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + if (elasticProcess!=null) { + elasticProcess.destroy(); + } + } + }); + } + } catch (Exception e) { + log.error("Failed to start elasticsearch server", e); + Throwables.propagate(e); + } + + boolean isUp = false; + int numTries = maxStartupTimeSeconds * 2; + while (!isUp && numTries-- > 0) { + try { + Thread.sleep(500); // wait 1/2 second + isUp = isUp(); + } catch (Exception e) { + + } + } + Assert.assertTrue(isUp, "Server is not up!"); + } + + + public boolean isUp() + { + try { + URL url = new URL("http://localhost:" + _httpPort + "/_cluster/health?wait_for_status=green"); + long startTime = System.nanoTime(); + HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); + int responseCode = httpURLConnection.getResponseCode(); + log.info("Duration: {} seconds, Response code = {}", + (System.nanoTime() - startTime) / 1000000000.0, + responseCode); + if (responseCode == 200) { return true; } else {return false;} + } + catch (Exception e) { + Throwables.propagate(e); + return false; + } + } + + public int getTransportPort() { + return _tcpPort; + } + + + public int getHttpPort() { return _httpPort; } + + + public void stop() { + if (elasticProcess != null) { + try { + elasticProcess.destroy(); + elasticProcess = null; // set to null to prevent redundant call to destroy on shutdown + } catch (Exception e) { + log.warn("Failed to stop the ElasticSearch server", e); + } + } + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java new file mode 100644 index 00000000000..dc3294ddb20 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/ElasticsearchTestServerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch; + +import java.io.IOException; + +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + + +/** + * A Test to test that the {@link ElasticsearchTestServer} class does what it is supposed to do + */ +public class ElasticsearchTestServerTest { + + + ElasticsearchTestServer _elasticsearchTestServer; + + @BeforeSuite + public void startServer() + throws IOException { + _elasticsearchTestServer = new ElasticsearchTestServer(); + _elasticsearchTestServer.start(60); + } + @Test + public void testServerStart() + throws InterruptedException, IOException { + _elasticsearchTestServer.start(60); // second start should be a no-op + } + + @AfterSuite + public void stopServer() { + _elasticsearchTestServer.stop(); + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java new file mode 100644 index 00000000000..f12528dce1b --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ConfigBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.util.Properties; + +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; +import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.Setter; +import lombok.experimental.Accessors; + + +/** + * A helper class to build Config for Elasticsearch Writers + */ +@Accessors(chain=true) +public class ConfigBuilder { + @Setter + String indexName; + @Setter + String indexType; + @Setter + int httpPort; + @Setter + int transportPort; + @Setter + boolean idMappingEnabled = true; + @Setter + String clientType = "REST"; + @Setter + String typeMapperClassName; + @Setter + MalformedDocPolicy malformedDocPolicy; + + Config build() { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_CLIENT_TYPE, clientType); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, indexName); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, indexType); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED, + "" + idMappingEnabled); + if (this.clientType.equalsIgnoreCase("rest")) { + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS, "localhost:" + httpPort); + } else if (this.clientType.equalsIgnoreCase("transport")) { + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_HOSTS, "localhost:" + transportPort); + } else throw new RuntimeException("Client type needs to be one of rest/transport"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, typeMapperClassName); + if (malformedDocPolicy != null) { + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY, + malformedDocPolicy.toString().toUpperCase()); + } + return ConfigFactory.parseProperties(props); + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java new file mode 100644 index 00000000000..46ae6807449 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchTransportClientWriterTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.net.UnknownHostException; +import java.util.Properties; + +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchTransportClientWriterTest { + + @Test + public void testBadSslConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_ID_MAPPING_ENABLED, "true"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_SSL_ENABLED, "true"); + Config config = ConfigFactory.parseProperties(props); + try { + new ElasticsearchTransportClientWriter(config); + Assert.fail("Writer should not be constructed"); + } + catch (Exception e) { + } + } + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java new file mode 100644 index 00000000000..341cd765501 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterBaseTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.net.UnknownHostException; +import java.util.Properties; + +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchWriterBaseTest { + + public static ElasticsearchWriterBase getWriterBase(Config config) + throws UnknownHostException { + return new ElasticsearchWriterBase(config) { + @Override + int getDefaultPort() { + return 0; + } + }; + } + + private void assertFailsToConstruct(Properties props, String testScenario) { + assertConstructionExpectation(props, testScenario, false); + } + + private void assertSucceedsToConstruct(Properties props, String testScenario) { + assertConstructionExpectation(props, testScenario, true); + } + + private void assertConstructionExpectation(Properties props, + String testScenario, + Boolean constructionSuccess) { + Config config = ConfigFactory.parseProperties(props); + try { + ElasticsearchWriterBase writer = getWriterBase(config); + if (!constructionSuccess) { + Assert.fail("Test Scenario: " + testScenario + ": Writer should not be constructed"); + } + } + catch (Exception e) { + if (constructionSuccess) { + Assert.fail("Test Scenario: " + testScenario + ": Writer should be constructed successfully"); + } + } + } + + @Test + public void testMinimalRequiredConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertSucceedsToConstruct(props, "minimal configuration"); + } + + @Test + public void testBadIndexNameConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_TYPE, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertFailsToConstruct(props, "index name missing"); + } + + + + @Test + public void testBadIndexNameCasingConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "Test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertFailsToConstruct(props, "bad index name casing"); + } + + @Test + public void testBadIndexTypeConfiguration() + throws UnknownHostException { + Properties props = new Properties(); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_INDEX_NAME, "test"); + props.setProperty(ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_TYPEMAPPER_CLASS, + AvroGenericRecordTypeMapper.class.getCanonicalName()); + assertFailsToConstruct(props, "no index type provided"); + } + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java new file mode 100644 index 00000000000..746171c68bb --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchWriterIntegrationTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.math3.util.Pair; +import org.apache.gobblin.elasticsearch.ElasticsearchTestServer; +import org.apache.gobblin.test.AvroRecordGenerator; +import org.apache.gobblin.test.JsonRecordGenerator; +import org.apache.gobblin.test.PayloadType; +import org.apache.gobblin.test.RecordTypeGenerator; +import org.apache.gobblin.test.TestUtils; +import org.apache.gobblin.writer.AsyncWriterManager; +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.gobblin.writer.BufferedAsyncDataWriter; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.SequentialBasedBatchAccumulator; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class ElasticsearchWriterIntegrationTest { + + + private ElasticsearchTestServer _esTestServer; + private String pid = ManagementFactory.getRuntimeMXBean().getName(); + + private List variants; + private List recordGenerators; + + ElasticsearchWriterIntegrationTest() { + variants = ImmutableList.of(new RestWriterVariant(), + new TransportWriterVariant()); + recordGenerators = ImmutableList.of(new AvroRecordGenerator(), new JsonRecordGenerator()); + } + + @BeforeSuite + public void startServers() + throws IOException { + log.error("{}: Starting Elasticsearch Server", pid); + _esTestServer = new ElasticsearchTestServer(); + _esTestServer.start(60); + } + + @AfterSuite + public void stopServers() { + log.error("{}: Stopping Elasticsearch Server", pid); + _esTestServer.stop(); + } + + + @Test + public void testSingleRecordWrite() + throws IOException { + + for (WriterVariant writerVariant : variants) { + for (RecordTypeGenerator recordVariant : recordGenerators) { + + String indexName = "posts" + writerVariant.getName().toLowerCase(); + String indexType = recordVariant.getName(); + Config config = writerVariant.getConfigBuilder() + .setIndexName(indexName) + .setIndexType(indexType) + .setTypeMapperClassName(recordVariant.getTypeMapperClassName()) + .setHttpPort(_esTestServer.getHttpPort()) + .setTransportPort(_esTestServer.getTransportPort()) + .build(); + + TestClient testClient = writerVariant.getTestClient(config); + SequentialBasedBatchAccumulator batchAccumulator = new SequentialBasedBatchAccumulator<>(config); + BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, writerVariant.getBatchAsyncDataWriter(config)); + + + String id = TestUtils.generateRandomAlphaString(10); + Object testRecord = recordVariant.getRecord(id, PayloadType.STRING); + + DataWriter writer = AsyncWriterManager.builder().failureAllowanceRatio(0.0).retriesEnabled(false).config(config) + .asyncDataWriter(bufferedAsyncDataWriter).build(); + + try { + testClient.recreateIndex(indexName); + writer.write(testRecord); + writer.commit(); + } finally { + writer.close(); + } + + try { + GetResponse response = testClient.get(new GetRequest(indexName, indexType, id)); + Assert.assertEquals(response.getId(), id, "Response id matches request"); + Assert.assertEquals(response.isExists(), true, "Document not found"); + } catch (Exception e) { + Assert.fail("Failed to get a response", e); + } finally { + testClient.close(); + } + } + } + } + + @Test + public void testMalformedDocCombinations() + throws IOException { + for (WriterVariant writerVariant : variants) { + for (RecordTypeGenerator recordVariant : recordGenerators) { + for (MalformedDocPolicy policy : MalformedDocPolicy.values()) { + testMalformedDocs(writerVariant, recordVariant, policy); + } + } + } + } + + + + /** + * Sends two docs in a single batch with different field types + * Triggers Elasticsearch server to send back an exception due to malformed docs + * @throws IOException + */ + public void testMalformedDocs(WriterVariant writerVariant, RecordTypeGenerator recordVariant, MalformedDocPolicy malformedDocPolicy) + throws IOException { + + String indexName = writerVariant.getName().toLowerCase(); + String indexType = (recordVariant.getName()+malformedDocPolicy.name()).toLowerCase(); + Config config = writerVariant.getConfigBuilder() + .setIdMappingEnabled(true) + .setIndexName(indexName) + .setIndexType(indexType) + .setHttpPort(_esTestServer.getHttpPort()) + .setTransportPort(_esTestServer.getTransportPort()) + .setTypeMapperClassName(recordVariant.getTypeMapperClassName()) + .setMalformedDocPolicy(malformedDocPolicy) + .build(); + + + TestClient testClient = writerVariant.getTestClient(config); + testClient.recreateIndex(indexName); + + String id1=TestUtils.generateRandomAlphaString(10); + String id2=TestUtils.generateRandomAlphaString(10); + + Object testRecord1 = recordVariant.getRecord(id1, PayloadType.LONG); + Object testRecord2 = recordVariant.getRecord(id2, PayloadType.MAP); + + SequentialBasedBatchAccumulator batchAccumulator = new SequentialBasedBatchAccumulator<>(config); + BatchAsyncDataWriter elasticsearchWriter = writerVariant.getBatchAsyncDataWriter(config); + BufferedAsyncDataWriter bufferedAsyncDataWriter = new BufferedAsyncDataWriter(batchAccumulator, elasticsearchWriter); + + + DataWriter writer = AsyncWriterManager.builder() + .failureAllowanceRatio(0.0) + .retriesEnabled(false) + .config(config) + .asyncDataWriter(bufferedAsyncDataWriter) + .build(); + + try { + writer.write(testRecord1); + writer.write(testRecord2); + writer.commit(); + writer.close(); + if (malformedDocPolicy == MalformedDocPolicy.FAIL) { + Assert.fail("Should have thrown an exception if malformed doc policy was set to Fail"); + } + } + catch (Exception e) { + switch (malformedDocPolicy) { + case IGNORE:case WARN:{ + Assert.fail("Should not have failed if malformed doc policy was set to ignore or warn", e); + break; + } + case FAIL: { + // pass through + break; + } + default: { + throw new RuntimeException("This test does not handle this policyType : " + malformedDocPolicy.toString()); + } + } + } + + // Irrespective of policy, first doc should be inserted and second doc should fail + int docsIndexed = 0; + try { + { + GetResponse response = testClient.get(new GetRequest(indexName, indexType, id1)); + Assert.assertEquals(response.getId(), id1, "Response id matches request"); + System.out.println(malformedDocPolicy + ":" + response.toString()); + if (response.isExists()) { + docsIndexed++; + } + } + { + GetResponse response = testClient.get(new GetRequest(indexName, indexType, id2)); + Assert.assertEquals(response.getId(), id2, "Response id matches request"); + System.out.println(malformedDocPolicy + ":" + response.toString()); + if (response.isExists()) { + docsIndexed++; + } + } + // only one doc should be found + Assert.assertEquals(docsIndexed, 1, "Only one document should be indexed"); + } + catch (Exception e) { + Assert.fail("Failed to get a response", e); + } + finally { + testClient.close(); + } + } + + + + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java new file mode 100644 index 00000000000..94d8e5609e6 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/RestWriterVariant.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; +import java.util.Collections; + +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.testng.Assert; + +import com.typesafe.config.Config; + + +/** + * A variant that uses the {@link ElasticsearchRestWriter} + */ +public class RestWriterVariant implements WriterVariant { + + private ElasticsearchRestWriter _restWriter; + @Override + public String getName() { + return "rest"; + } + + @Override + public ConfigBuilder getConfigBuilder() { + return new ConfigBuilder() + .setClientType("REST"); + } + + @Override + public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config) + throws IOException { + _restWriter = new ElasticsearchRestWriter(config); + return _restWriter; + } + + @Override + public TestClient getTestClient(Config config) + throws IOException { + final ElasticsearchRestWriter restWriter = new ElasticsearchRestWriter(config); + final RestHighLevelClient highLevelClient = restWriter.getRestHighLevelClient(); + return new TestClient() { + @Override + public GetResponse get(GetRequest getRequest) + throws IOException { + return highLevelClient.get(getRequest); + } + + @Override + public void recreateIndex(String indexName) + throws IOException { + RestClient restClient = restWriter.getRestLowLevelClient(); + try { + restClient.performRequest("DELETE", "/" + indexName); + } catch (Exception e) { + // ok since index may not exist + } + + String indexSettings = "{\"settings\" : {\"index\":{\"number_of_shards\":1,\"number_of_replicas\":1}}}"; + HttpEntity entity = new StringEntity(indexSettings, ContentType.APPLICATION_JSON); + + Response putResponse = restClient.performRequest("PUT", "/" + indexName, Collections.emptyMap(), entity); + Assert.assertEquals(putResponse.getStatusLine().getStatusCode(),200, "Recreate index succeeded"); + } + + @Override + public void close() + throws IOException { + restWriter.close(); + + } + }; + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java new file mode 100644 index 00000000000..31f08b11739 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TestClient.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.Closeable; +import java.io.IOException; + +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; + + +/** + * An interface to describe a functional Elasticsearch client to aid in verification + * of test results + */ + +public interface TestClient extends Closeable { + GetResponse get(GetRequest getRequest) + throws IOException; + + void recreateIndex(String indexName) + throws IOException; +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java new file mode 100644 index 00000000000..eb28c9a2914 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/TransportWriterVariant.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; + +import org.apache.gobblin.writer.BatchAsyncDataWriter; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.testng.Assert; + +import com.typesafe.config.Config; + + +/** + * A variant that uses the {@link ElasticsearchTransportClientWriter} + */ +public class TransportWriterVariant implements WriterVariant { + @Override + public String getName() { + return "transport"; + } + + @Override + public ConfigBuilder getConfigBuilder() { + return new ConfigBuilder() + .setClientType("transport"); + } + + @Override + public BatchAsyncDataWriter getBatchAsyncDataWriter(Config config) + throws IOException { + ElasticsearchTransportClientWriter transportClientWriter = new ElasticsearchTransportClientWriter(config); + return transportClientWriter; + } + + @Override + public TestClient getTestClient(Config config) + throws IOException { + final ElasticsearchTransportClientWriter transportClientWriter = new ElasticsearchTransportClientWriter(config); + final TransportClient transportClient = transportClientWriter.getTransportClient(); + return new TestClient() { + @Override + public GetResponse get(GetRequest getRequest) + throws IOException { + try { + return transportClient.get(getRequest).get(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public void recreateIndex(String indexName) + throws IOException { + DeleteIndexRequestBuilder dirBuilder = transportClient.admin().indices().prepareDelete(indexName); + try { + DeleteIndexResponse diResponse = dirBuilder.execute().actionGet(); + } catch (IndexNotFoundException ie) { + System.out.println("Index not found... that's ok"); + } + + CreateIndexRequestBuilder cirBuilder = transportClient.admin().indices().prepareCreate(indexName); + CreateIndexResponse ciResponse = cirBuilder.execute().actionGet(); + Assert.assertTrue(ciResponse.isAcknowledged(), "Create index succeeeded"); + } + + @Override + public void close() + throws IOException { + transportClientWriter.close(); + } + }; + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java new file mode 100644 index 00000000000..581ec2ef448 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/elasticsearch/writer/WriterVariant.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.elasticsearch.writer; + +import java.io.IOException; + +import org.apache.gobblin.writer.BatchAsyncDataWriter; + +import com.typesafe.config.Config; + + +/** + * An interface to implement Writer variants to enable generic testing + */ +public interface WriterVariant { + + String getName(); + + ConfigBuilder getConfigBuilder(); + + BatchAsyncDataWriter getBatchAsyncDataWriter(Config config) + throws IOException; + + TestClient getTestClient(Config config) + throws IOException; +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java new file mode 100644 index 00000000000..29433f30b81 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/AvroRecordGenerator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Collections; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper; + + +/** + * A generator of Avro records of type {@link GenericRecord} + */ +public class AvroRecordGenerator implements RecordTypeGenerator { + @Override + public String getName() { + return "avro"; + } + + @Override + public String getTypeMapperClassName() { + return AvroGenericRecordTypeMapper.class.getCanonicalName(); + } + + @Override + public GenericRecord getRecord(String id, PayloadType payloadType) { + GenericRecord record = getTestAvroRecord(id, payloadType); + return record; + } + + static GenericRecord getTestAvroRecord(String identifier, PayloadType payloadType) { + Schema dataRecordSchema = + SchemaBuilder.record("Data").fields().name("data").type().bytesType().noDefault().name("flags").type().intType() + .noDefault().endRecord(); + + Schema schema; + Object payloadValue; + switch (payloadType) { + case STRING: { + schema = SchemaBuilder.record("TestRecord").fields() + .name("id").type().stringType().noDefault() + .name("key").type().stringType().noDefault() + .name("data").type(dataRecordSchema).noDefault() + .endRecord(); + payloadValue = TestUtils.generateRandomAlphaString(20); + break; + } + case LONG: { + schema = SchemaBuilder.record("TestRecord").fields() + .name("id").type().stringType().noDefault() + .name("key").type().longType().noDefault() + .name("data").type(dataRecordSchema).noDefault() + .endRecord(); + payloadValue = TestUtils.generateRandomLong(); + break; + } + case MAP: { + schema = SchemaBuilder.record("TestRecord").fields() + .name("id").type().stringType().noDefault() + .name("key").type().map().values().stringType().noDefault() + .name("data").type(dataRecordSchema).noDefault() + .endRecord(); + payloadValue = Collections.EMPTY_MAP; + break; + } + default: { + throw new RuntimeException("Do not know how to handle this time"); + } + } + + GenericData.Record testRecord = new GenericData.Record(schema); + + String testContent = "hello world"; + + GenericData.Record dataRecord = new GenericData.Record(dataRecordSchema); + dataRecord.put("data", ByteBuffer.wrap(testContent.getBytes(Charset.forName("UTF-8")))); + dataRecord.put("flags", 0); + + testRecord.put("key", payloadValue); + testRecord.put("data", dataRecord); + testRecord.put("id", identifier); + return testRecord; + } + +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java new file mode 100644 index 00000000000..7aea277a0f1 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/JsonRecordGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +import java.util.Collections; + +import org.apache.gobblin.elasticsearch.typemapping.JsonTypeMapper; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; + + +/** + * A generator of {@link JsonElement} records + */ +public class JsonRecordGenerator implements RecordTypeGenerator { + private final Gson gson = new Gson(); + + @Override + public String getName() { + return "json"; + } + + @Override + public String getTypeMapperClassName() { + return JsonTypeMapper.class.getCanonicalName(); + } + + static class TestObject { + private String id; + private T key; + + TestObject(String id, T payload) { + this.id = id; + this.key = payload; + } + } + + @Override + public JsonElement getRecord(String id, PayloadType payloadType) { + Object testObject; + switch (payloadType) { + case STRING: { + testObject = new TestObject(id, TestUtils.generateRandomAlphaString(20)); + break; + } + case LONG: { + testObject = new TestObject(id, TestUtils.generateRandomLong()); + break; + } + case MAP: { + testObject = new TestObject(id, Collections.EMPTY_MAP); + break; + } + default: + throw new RuntimeException("Do not know how to handle this type of payload"); + } + JsonElement jsonElement = gson.toJsonTree(testObject); + return jsonElement; + } +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java new file mode 100644 index 00000000000..d793c474a1a --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/PayloadType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +/** + * An enumeration of Payload types + * Used to configure the record in tests + */ +public enum PayloadType { + STRING, + LONG, + MAP +} diff --git a/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java new file mode 100644 index 00000000000..00c798fab62 --- /dev/null +++ b/gobblin-modules/gobblin-elasticsearch/src/test/java/org/apache/gobblin/test/RecordTypeGenerator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.test; + +/** + * An interface to describe a generator of records + */ +public interface RecordTypeGenerator { + /** + * The name of this record type + * @return + */ + String getName(); + + /** + * A {@link org.apache.gobblin.elasticsearch.typemapping.TypeMapper} that can work with + * records of this type + * @return + */ + String getTypeMapperClassName(); + + /** + * Generate a record with the provided characteristics + * @param identifier + * @param payloadType + * @return a record of the type T + */ + T getRecord(String identifier, PayloadType payloadType); +} diff --git a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java index ab4bffafd5b..9db378224a6 100644 --- a/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java +++ b/gobblin-modules/gobblin-eventhub/src/test/java/org/apache/gobblin/eventhub/writer/EventhubBatchTest.java @@ -19,6 +19,8 @@ import java.io.IOException; import org.apache.gobblin.writer.BytesBoundedBatch; +import org.apache.gobblin.writer.LargeMessagePolicy; +import org.apache.gobblin.writer.RecordTooLargeException; import org.testng.Assert; import org.testng.annotations.Test; @@ -28,41 +30,44 @@ public class EventhubBatchTest { @Test - public void testBatchWithLargeRecord() throws IOException { + public void testBatchWithLargeRecord() + throws IOException, RecordTooLargeException { // Assume memory size has only 2 bytes BytesBoundedBatch batch = new BytesBoundedBatch(8, 3000); String record = "abcdefgh"; // Record is larger than the memory size limit, the first append should fail - Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, LargeMessagePolicy.DROP)); // The second append should still fail - Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, LargeMessagePolicy.DROP)); } @Test - public void testBatch() throws IOException { + public void testBatch() + throws IOException, RecordTooLargeException { // Assume memory size has only 200 bytes BytesBoundedBatch batch = new BytesBoundedBatch(200, 3000); // Add additional 15 bytes overhead, total size is 27 bytes String record = "abcdefgh"; - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); + LargeMessagePolicy policy = LargeMessagePolicy.DROP; + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); // Batch has room for 8th record - Assert.assertEquals(batch.hasRoom(record), true); - Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertEquals(batch.hasRoom(record, policy), true); + Assert.assertNotNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); // Batch has no room for 9th record - Assert.assertEquals(batch.hasRoom(record), false); - Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY)); + Assert.assertEquals(batch.hasRoom(record, policy), false); + Assert.assertNull(batch.tryAppend(record, WriteCallback.EMPTY, policy)); } } diff --git a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java index a58319818a4..68c79e93e17 100644 --- a/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java +++ b/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java @@ -45,6 +45,27 @@ public static byte[] generateRandomBytes(int numBytes) { return messageBytes; } + private static final char[] alphas = new char[26]; + + public static Long generateRandomLong() { + return rng.nextLong(); + } + + static { + char ch = 'a'; + for (int i = 0; i < 26; i++) { + alphas[i] = ch++; + } + } + + public static String generateRandomAlphaString(int stringLength) { + char[] newString = new char[stringLength]; + for (int i = 0; i < stringLength; ++i) + { + newString[i] = alphas[rng.nextInt(26)]; + } + return new String(newString); + } /** * TODO: Currently generates a static schema avro record. diff --git a/gradle/scripts/globalDependencies.gradle b/gradle/scripts/globalDependencies.gradle index d1d2e03c327..d64db6759ab 100644 --- a/gradle/scripts/globalDependencies.gradle +++ b/gradle/scripts/globalDependencies.gradle @@ -23,20 +23,22 @@ subprojects { configurations { compile dependencies { - compile(externalDependency.hadoopCommon) { - exclude module: 'servlet-api' - } - compile externalDependency.hadoopClientCore - compile externalDependency.hadoopAnnotations - if (project.name.equals('gobblin-runtime') || project.name.equals('gobblin-test')) { - compile externalDependency.hadoopClientCommon - } - compile(externalDependency.guava) { - force = true - } - compile(externalDependency.commonsCodec) { - force = true - } + if (!project.name.contains('gobblin-elasticsearch-deps')) { + compile(externalDependency.hadoopCommon) { + exclude module: 'servlet-api' + } + compile externalDependency.hadoopClientCore + compile externalDependency.hadoopAnnotations + if (project.name.equals('gobblin-runtime') || project.name.equals('gobblin-test')) { + compile externalDependency.hadoopClientCommon + } + compile(externalDependency.guava) { + force = true + } + } + compile(externalDependency.commonsCodec) { + force = true + } // Required to add JDK's tool jar, which is required to run byteman tests. testCompile (files(((URLClassLoader) ToolProvider.getSystemToolClassLoader()).getURLs()))