Skip to content

Commit

Permalink
[GOBBLIN-17] Add Elasticsearch writer (rest + transport)
Browse files Browse the repository at this point in the history
Closes #2419 from shirshanka/elastic
  • Loading branch information
shirshanka authored and abti committed Sep 1, 2018
1 parent ef438c8 commit f1bc746
Show file tree
Hide file tree
Showing 46 changed files with 3,005 additions and 61 deletions.
Expand Up @@ -340,6 +340,8 @@ public void onFailure(Throwable throwable) {
.update(currTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS);
}
if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) { // attempts must == numRetries + 1
log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", attempt.attemptNum, throwable.getMessage(),
attempt.getRecord().toString());
attempt.incAttempt();
attempt.setPrevAttemptFailure(throwable);
AsyncWriterManager.this.retryQueue.get().add(attempt);
Expand Down Expand Up @@ -391,6 +393,7 @@ public void run() {
Attempt attempt = this.retryQueue.take();
if (attempt != null) {
maybeSleep(attempt.getPrevAttemptTimestampNanos());
log.debug("Retry thread will retry record: {}", attempt.getRecord().toString());
attemptWrite(attempt);
}
} catch (InterruptedException e) {
Expand Down
Expand Up @@ -127,17 +127,18 @@ public void onFailure (Throwable throwable) {
/**
* A method to check if the batch has the room to add a new record
* @param record: record needs to be added
* @param largeMessagePolicy: the policy that is in effect for large messages
* @return Indicates if this batch still have enough space to hold a new record
*/
public abstract boolean hasRoom (D record);
public abstract boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy);

/**
* Add a record to this batch
* <p>
* Implementation of this method should always ensure the record can be added successfully
* The contract between {@link Batch#tryAppend(Object, WriteCallback)} and this method is this method
* The contract between {@link Batch#tryAppend(Object, WriteCallback, LargeMessagePolicy)} and this method is this method
* is responsible for adding record to internal batch memory and the check for the room space is performed
* by {@link Batch#hasRoom(Object)}. All the potential issues for adding a record should
* by {@link Batch#hasRoom(Object, LargeMessagePolicy)}. All the potential issues for adding a record should
* already be resolved before this method is invoked.
* </p>
*
Expand All @@ -162,14 +163,19 @@ public void onFailure (Throwable throwable) {
*
* @param record : record needs to be added
* @param callback : A callback which will be invoked when the whole batch gets sent and acknowledged
* @param largeMessagePolicy : the {@link LargeMessagePolicy} that is in effect for this batch
* @return A future object which contains {@link RecordMetadata}
*/
public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) {
if (!hasRoom(record)) {
LOG.debug ("Cannot add " + record + " to previous batch because the batch already has " + getCurrentSizeInByte() + " bytes");
public Future<RecordMetadata> tryAppend(D record, WriteCallback callback, LargeMessagePolicy largeMessagePolicy)
throws RecordTooLargeException {
if (!hasRoom(record, largeMessagePolicy)) {
LOG.debug ("Cannot add {} to previous batch because the batch already has {} bytes",
record.toString(), getCurrentSizeInByte());
if (largeMessagePolicy == LargeMessagePolicy.FAIL) {
throw new RecordTooLargeException();
}
return null;
}

this.append(record);
thunks.add(new Thunk(callback, getRecordSizeInByte(record)));
RecordFuture future = new RecordFuture(latch, recordCount);
Expand All @@ -178,7 +184,9 @@ public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) {
}

public void await() throws InterruptedException{
LOG.debug("Batch {} waiting for {} records", this.id, this.recordCount);
this.latch.await();
LOG.debug("Batch {} done with {} records", this.id, this.recordCount);
}

}
Expand Up @@ -39,7 +39,7 @@
* @param <D> data record type
*/
@Alpha
public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
public class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {

private RecordProcessor<D> processor;
private BatchAccumulator<D> accumulator;
Expand Down Expand Up @@ -136,7 +136,7 @@ private WriteCallback createBatchCallback (final Batch<D> batch) {
return new WriteCallback<Object>() {
@Override
public void onSuccess(WriteResponse writeResponse) {
LOG.info ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
LOG.debug ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
batch.onSuccess(writeResponse);
batch.done();
accumulator.deallocate(batch);
Expand Down
Expand Up @@ -62,7 +62,11 @@ void append (D record) {
records.add(record);
}

boolean hasRoom (D record) {
boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) {
if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) {
// there is always space for one record, no matter how big :)
return true;
}
long recordLen = BytesBoundedBatch.this.getInternalSize(record);
return (byteSize + recordLen) <= BytesBoundedBatch.this.memSizeLimit;
}
Expand All @@ -80,8 +84,8 @@ public List<D> getRecords() {
return memory.getRecords();
}

public boolean hasRoom (D object) {
return memory.hasRoom(object);
public boolean hasRoom (D object, LargeMessagePolicy largeMessagePolicy) {
return memory.hasRoom(object, largeMessagePolicy);
}

public void append (D object) {
Expand Down
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.writer;

/**
* Describes how single messages that are larger than a batch message limit should be treated
*/
public enum LargeMessagePolicy {
DROP, // drop (and log) messages that exceed the threshold
ATTEMPT, // attempt to deliver messages that exceed the threshold
FAIL // throw an error when this happens
}
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.writer;

public class RecordTooLargeException extends Exception {
}
Expand Up @@ -43,14 +43,16 @@
* keeps in the deque until a TTL is expired.
*/

public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
public class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {

private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL;
private Deque<BytesBoundedBatch<D>> dq = new LinkedList<>();
private IncompleteRecordBatches incomplete = new IncompleteRecordBatches();
private final long batchSizeLimit;
private final long memSizeLimit;
private final double tolerance = 0.95;
private final long expireInMilliSecond;
private final LargeMessagePolicy largeMessagePolicy;
private static final Logger LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class);

private final ReentrantLock dqLock = new ReentrantLock();
Expand All @@ -63,24 +65,31 @@ public SequentialBasedBatchAccumulator() {
}

public SequentialBasedBatchAccumulator(Properties properties) {
Config config = ConfigUtils.propertiesToConfig(properties);
this.batchSizeLimit = ConfigUtils.getLong(config, Batch.BATCH_SIZE,
Batch.BATCH_SIZE_DEFAULT);

this.expireInMilliSecond = ConfigUtils.getLong(config, Batch.BATCH_TTL,
Batch.BATCH_TTL_DEFAULT);

this.capacity = ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
Batch.BATCH_QUEUE_CAPACITY_DEFAULT);
this(ConfigUtils.propertiesToConfig(properties));
}

this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
public SequentialBasedBatchAccumulator(Config config) {
this(ConfigUtils.getLong(config, Batch.BATCH_SIZE,
Batch.BATCH_SIZE_DEFAULT),
ConfigUtils.getLong(config, Batch.BATCH_TTL,
Batch.BATCH_TTL_DEFAULT),
ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
Batch.BATCH_QUEUE_CAPACITY_DEFAULT));
}

public SequentialBasedBatchAccumulator(long batchSizeLimit, long expireInMilliSecond, long capacity) {
this(batchSizeLimit, expireInMilliSecond, capacity, DEFAULT_LARGE_MESSAGE_POLICY);
}

public SequentialBasedBatchAccumulator(long batchSizeLimit,
long expireInMilliSecond,
long capacity,
LargeMessagePolicy largeMessagePolicy) {
this.batchSizeLimit = batchSizeLimit;
this.expireInMilliSecond = expireInMilliSecond;
this.capacity = capacity;
this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
this.largeMessagePolicy = largeMessagePolicy;
}

public long getNumOfBatches () {
Expand All @@ -101,7 +110,12 @@ public final Future<RecordMetadata> enqueue (D record, WriteCallback callback) t
try {
BytesBoundedBatch last = dq.peekLast();
if (last != null) {
Future<RecordMetadata> future = last.tryAppend(record, callback);
Future<RecordMetadata> future = null;
try {
future = last.tryAppend(record, callback, this.largeMessagePolicy);
} catch (RecordTooLargeException e) {
// Ok if the record was too large for the current batch
}
if (future != null) {
return future;
}
Expand All @@ -110,12 +124,18 @@ public final Future<RecordMetadata> enqueue (D record, WriteCallback callback) t
// Create a new batch because previous one has no space
BytesBoundedBatch batch = new BytesBoundedBatch(this.memSizeLimit, this.expireInMilliSecond);
LOG.debug("Batch " + batch.getId() + " is generated");
Future<RecordMetadata> future = batch.tryAppend(record, callback);
Future<RecordMetadata> future = null;
try {
future = batch.tryAppend(record, callback, this.largeMessagePolicy);
} catch (RecordTooLargeException e) {
// If a new batch also wasn't able to accomodate the new message
throw new RuntimeException("Failed due to a message that was too large", e);
}

// Even single record can exceed the batch size limit
// Ignore the record because Eventhub can only accept payload less than 256KB
// The future might be null, since the largeMessagePolicy might be set to DROP
if (future == null) {
LOG.error("Batch " + batch.getId() + " is marked as complete because it contains a huge record: "
assert largeMessagePolicy.equals(LargeMessagePolicy.DROP);
LOG.error("Batch " + batch.getId() + " is silently marked as complete, dropping a huge record: "
+ record);
future = Futures.immediateFuture(new RecordMetadata(0));
callback.onSuccess(WriteResponse.EMPTY);
Expand All @@ -124,6 +144,7 @@ public final Future<RecordMetadata> enqueue (D record, WriteCallback callback) t

// if queue is full, we should not add more
while (dq.size() >= this.capacity) {
LOG.debug("Accumulator size {} is greater than capacity {}, waiting", dq.size(), this.capacity);
this.notFull.await();
}
dq.addLast(batch);
Expand Down Expand Up @@ -187,7 +208,7 @@ public Batch<D> getNextAvailableBatch () {
return dq.poll();
} else {
while (dq.size() == 0) {
LOG.info ("ready to sleep because of queue is empty");
LOG.debug ("ready to sleep because of queue is empty");
SequentialBasedBatchAccumulator.this.notEmpty.await();
if (SequentialBasedBatchAccumulator.this.isClosed()) {
return dq.poll();
Expand All @@ -203,7 +224,7 @@ public Batch<D> getNextAvailableBatch () {

if (dq.size() == 1) {
if (dq.peekFirst().isTTLExpire()) {
LOG.info ("Batch " + dq.peekFirst().getId() + " is expired");
LOG.debug ("Batch " + dq.peekFirst().getId() + " is expired");
BytesBoundedBatch candidate = dq.poll();
SequentialBasedBatchAccumulator.this.notFull.signal();
return candidate;
Expand Down Expand Up @@ -240,12 +261,16 @@ public void close() {
public void flush() {
try {
ArrayList<Batch> batches = this.incomplete.all();
LOG.info ("flush on {} batches", batches.size());
int numOutstandingRecords = 0;
for (Batch batch: batches) {
numOutstandingRecords += batch.getRecords().size();
}
LOG.debug ("Flush called on {} batches with {} records total", batches.size(), numOutstandingRecords);
for (Batch batch: batches) {
batch.await();
}
} catch (Exception e) {
LOG.info ("Error happens when flushing");
LOG.error ("Error happened while flushing batches");
}
}

Expand Down
1 change: 1 addition & 0 deletions gobblin-distribution/gobblin-flavor-standard.gradle
Expand Up @@ -21,4 +21,5 @@ dependencies {
compile project(':gobblin-modules:gobblin-crypto-provider')
compile project(':gobblin-modules:gobblin-kafka-08')
compile project(':gobblin-modules:google-ingestion')
compile project(':gobblin-modules:gobblin-elasticsearch')
}
64 changes: 64 additions & 0 deletions gobblin-example/src/main/resources/wikipedia-elastic.conf
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A sample pull file that copies an input Kafka topic and produces to an output Kafka topic
# with sampling
job {
name=PullFromWikipediaToElasticSearch
group=Wikipedia
description=Pull from Wikipedia and write to ElasticSearch
}

task.maxretries=0

source {
class=org.apache.gobblin.example.wikipedia.WikipediaSource
page.titles="Wikipedia:Sandbox"
revisions.cnt=5
}

wikipedia {
api.rooturl="https://en.wikipedia.org/w/api.php"
avro.schema="{\"namespace\": \"example.wikipedia.avro\",\"type\": \"record\",\"name\": \"WikipediaArticle\",\"fields\": [{\"name\": \"revid\", \"type\": [\"double\", \"null\"]},{\"name\": \"pageid\", \"type\": [\"double\", \"null\"]},{\"name\": \"title\", \"type\": [\"string\", \"null\"]},{\"name\": \"user\", \"type\": [\"string\", \"null\"]},{\"name\": \"anon\", \"type\": [\"string\", \"null\"]},{\"name\": \"userid\", \"type\": [\"double\", \"null\"]},{\"name\": \"timestamp\", \"type\": [\"string\", \"null\"]},{\"name\": \"size\", \"type\": [\"double\", \"null\"]},{\"name\": \"contentformat\", \"type\": [\"string\", \"null\"]},{\"name\": \"contentmodel\", \"type\": [\"string\", \"null\"]},{\"name\": \"content\", \"type\": [\"string\", \"null\"]}]}"
}
converter.classes=org.apache.gobblin.example.wikipedia.WikipediaConverter
extract.namespace=org.apache.gobblin.example.wikipedia

writer {
builder.class=org.apache.gobblin.elasticsearch.writer.ElasticsearchDataWriterBuilder
elasticsearch {
client.type=REST
index.name=wikipedia-test
index.type=docs
#hosts=hostname
#ssl {
# enabled=true
# keystoreType=pkcs12
# keystorePassword=change_me
# keystoreLocation=/path/to/.p12 file
# truststoreType=jks
# truststoreLocation=/path/to/cacerts
# truststorePassword=changeme
#}
typeMapperClass=org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper
useIdFromData=false # change to true if you want to use a field from the record as the id field
#idFieldName=id # change to the field of the record that you want to use as the id of the document
}
}

data.publisher.type=org.apache.gobblin.publisher.NoopPublisher

0 comments on commit f1bc746

Please sign in to comment.