Skip to content

Commit

Permalink
[Gobblin-17] Add Elasticsearch writer (rest + transport)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Aug 31, 2018
1 parent 749b5bd commit 1182be1
Show file tree
Hide file tree
Showing 46 changed files with 3,005 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ public void onFailure(Throwable throwable) {
.update(currTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS);
}
if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) { // attempts must == numRetries + 1
log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", attempt.attemptNum, throwable.getMessage(),
attempt.getRecord().toString());
attempt.incAttempt();
attempt.setPrevAttemptFailure(throwable);
AsyncWriterManager.this.retryQueue.get().add(attempt);
Expand Down Expand Up @@ -391,6 +393,7 @@ public void run() {
Attempt attempt = this.retryQueue.take();
if (attempt != null) {
maybeSleep(attempt.getPrevAttemptTimestampNanos());
log.debug("Retry thread will retry record: {}", attempt.getRecord().toString());
attemptWrite(attempt);
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,18 @@ public void onFailure (Throwable throwable) {
/**
* A method to check if the batch has the room to add a new record
* @param record: record needs to be added
* @param largeMessagePolicy: the policy that is in effect for large messages
* @return Indicates if this batch still have enough space to hold a new record
*/
public abstract boolean hasRoom (D record);
public abstract boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy);

/**
* Add a record to this batch
* <p>
* Implementation of this method should always ensure the record can be added successfully
* The contract between {@link Batch#tryAppend(Object, WriteCallback)} and this method is this method
* The contract between {@link Batch#tryAppend(Object, WriteCallback, LargeMessagePolicy)} and this method is this method
* is responsible for adding record to internal batch memory and the check for the room space is performed
* by {@link Batch#hasRoom(Object)}. All the potential issues for adding a record should
* by {@link Batch#hasRoom(Object, LargeMessagePolicy)}. All the potential issues for adding a record should
* already be resolved before this method is invoked.
* </p>
*
Expand All @@ -162,14 +163,19 @@ public void onFailure (Throwable throwable) {
*
* @param record : record needs to be added
* @param callback : A callback which will be invoked when the whole batch gets sent and acknowledged
* @param largeMessagePolicy : the {@link LargeMessagePolicy} that is in effect for this batch
* @return A future object which contains {@link RecordMetadata}
*/
public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) {
if (!hasRoom(record)) {
LOG.debug ("Cannot add " + record + " to previous batch because the batch already has " + getCurrentSizeInByte() + " bytes");
public Future<RecordMetadata> tryAppend(D record, WriteCallback callback, LargeMessagePolicy largeMessagePolicy)
throws RecordTooLargeException {
if (!hasRoom(record, largeMessagePolicy)) {
LOG.debug ("Cannot add {} to previous batch because the batch already has {} bytes",
record.toString(), getCurrentSizeInByte());
if (largeMessagePolicy == LargeMessagePolicy.FAIL) {
throw new RecordTooLargeException();
}
return null;
}

this.append(record);
thunks.add(new Thunk(callback, getRecordSizeInByte(record)));
RecordFuture future = new RecordFuture(latch, recordCount);
Expand All @@ -178,7 +184,9 @@ public Future<RecordMetadata> tryAppend(D record, WriteCallback callback) {
}

public void await() throws InterruptedException{
LOG.debug("Batch {} waiting for {} records", this.id, this.recordCount);
this.latch.await();
LOG.debug("Batch {} done with {} records", this.id, this.recordCount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* @param <D> data record type
*/
@Alpha
public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
public class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {

private RecordProcessor<D> processor;
private BatchAccumulator<D> accumulator;
Expand Down Expand Up @@ -136,7 +136,7 @@ private WriteCallback createBatchCallback (final Batch<D> batch) {
return new WriteCallback<Object>() {
@Override
public void onSuccess(WriteResponse writeResponse) {
LOG.info ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
LOG.debug ("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
batch.onSuccess(writeResponse);
batch.done();
accumulator.deallocate(batch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ void append (D record) {
records.add(record);
}

boolean hasRoom (D record) {
boolean hasRoom (D record, LargeMessagePolicy largeMessagePolicy) {
if (records.isEmpty() && largeMessagePolicy == LargeMessagePolicy.ATTEMPT) {
// there is always space for one record, no matter how big :)
return true;
}
long recordLen = BytesBoundedBatch.this.getInternalSize(record);
return (byteSize + recordLen) <= BytesBoundedBatch.this.memSizeLimit;
}
Expand All @@ -80,8 +84,8 @@ public List<D> getRecords() {
return memory.getRecords();
}

public boolean hasRoom (D object) {
return memory.hasRoom(object);
public boolean hasRoom (D object, LargeMessagePolicy largeMessagePolicy) {
return memory.hasRoom(object, largeMessagePolicy);
}

public void append (D object) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.writer;

/**
* Describes how single messages that are larger than a batch message limit should be treated
*/
public enum LargeMessagePolicy {
DROP, // drop (and log) messages that exceed the threshold
ATTEMPT, // attempt to deliver messages that exceed the threshold
FAIL // throw an error when this happens
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.writer;

public class RecordTooLargeException extends Exception {
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@
* keeps in the deque until a TTL is expired.
*/

public abstract class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
public class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {

private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL;
private Deque<BytesBoundedBatch<D>> dq = new LinkedList<>();
private IncompleteRecordBatches incomplete = new IncompleteRecordBatches();
private final long batchSizeLimit;
private final long memSizeLimit;
private final double tolerance = 0.95;
private final long expireInMilliSecond;
private final LargeMessagePolicy largeMessagePolicy;
private static final Logger LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class);

private final ReentrantLock dqLock = new ReentrantLock();
Expand All @@ -63,24 +65,31 @@ public SequentialBasedBatchAccumulator() {
}

public SequentialBasedBatchAccumulator(Properties properties) {
Config config = ConfigUtils.propertiesToConfig(properties);
this.batchSizeLimit = ConfigUtils.getLong(config, Batch.BATCH_SIZE,
Batch.BATCH_SIZE_DEFAULT);

this.expireInMilliSecond = ConfigUtils.getLong(config, Batch.BATCH_TTL,
Batch.BATCH_TTL_DEFAULT);

this.capacity = ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
Batch.BATCH_QUEUE_CAPACITY_DEFAULT);
this(ConfigUtils.propertiesToConfig(properties));
}

this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
public SequentialBasedBatchAccumulator(Config config) {
this(ConfigUtils.getLong(config, Batch.BATCH_SIZE,
Batch.BATCH_SIZE_DEFAULT),
ConfigUtils.getLong(config, Batch.BATCH_TTL,
Batch.BATCH_TTL_DEFAULT),
ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY,
Batch.BATCH_QUEUE_CAPACITY_DEFAULT));
}

public SequentialBasedBatchAccumulator(long batchSizeLimit, long expireInMilliSecond, long capacity) {
this(batchSizeLimit, expireInMilliSecond, capacity, DEFAULT_LARGE_MESSAGE_POLICY);
}

public SequentialBasedBatchAccumulator(long batchSizeLimit,
long expireInMilliSecond,
long capacity,
LargeMessagePolicy largeMessagePolicy) {
this.batchSizeLimit = batchSizeLimit;
this.expireInMilliSecond = expireInMilliSecond;
this.capacity = capacity;
this.memSizeLimit = (long) (this.tolerance * this.batchSizeLimit);
this.largeMessagePolicy = largeMessagePolicy;
}

public long getNumOfBatches () {
Expand All @@ -101,7 +110,12 @@ public final Future<RecordMetadata> enqueue (D record, WriteCallback callback) t
try {
BytesBoundedBatch last = dq.peekLast();
if (last != null) {
Future<RecordMetadata> future = last.tryAppend(record, callback);
Future<RecordMetadata> future = null;
try {
future = last.tryAppend(record, callback, this.largeMessagePolicy);
} catch (RecordTooLargeException e) {
// Ok if the record was too large for the current batch
}
if (future != null) {
return future;
}
Expand All @@ -110,12 +124,18 @@ public final Future<RecordMetadata> enqueue (D record, WriteCallback callback) t
// Create a new batch because previous one has no space
BytesBoundedBatch batch = new BytesBoundedBatch(this.memSizeLimit, this.expireInMilliSecond);
LOG.debug("Batch " + batch.getId() + " is generated");
Future<RecordMetadata> future = batch.tryAppend(record, callback);
Future<RecordMetadata> future = null;
try {
future = batch.tryAppend(record, callback, this.largeMessagePolicy);
} catch (RecordTooLargeException e) {
// If a new batch also wasn't able to accomodate the new message
throw new RuntimeException("Failed due to a message that was too large", e);
}

// Even single record can exceed the batch size limit
// Ignore the record because Eventhub can only accept payload less than 256KB
// The future might be null, since the largeMessagePolicy might be set to DROP
if (future == null) {
LOG.error("Batch " + batch.getId() + " is marked as complete because it contains a huge record: "
assert largeMessagePolicy.equals(LargeMessagePolicy.DROP);
LOG.error("Batch " + batch.getId() + " is silently marked as complete, dropping a huge record: "
+ record);
future = Futures.immediateFuture(new RecordMetadata(0));
callback.onSuccess(WriteResponse.EMPTY);
Expand All @@ -124,6 +144,7 @@ public final Future<RecordMetadata> enqueue (D record, WriteCallback callback) t

// if queue is full, we should not add more
while (dq.size() >= this.capacity) {
LOG.debug("Accumulator size {} is greater than capacity {}, waiting", dq.size(), this.capacity);
this.notFull.await();
}
dq.addLast(batch);
Expand Down Expand Up @@ -187,7 +208,7 @@ public Batch<D> getNextAvailableBatch () {
return dq.poll();
} else {
while (dq.size() == 0) {
LOG.info ("ready to sleep because of queue is empty");
LOG.debug ("ready to sleep because of queue is empty");
SequentialBasedBatchAccumulator.this.notEmpty.await();
if (SequentialBasedBatchAccumulator.this.isClosed()) {
return dq.poll();
Expand All @@ -203,7 +224,7 @@ public Batch<D> getNextAvailableBatch () {

if (dq.size() == 1) {
if (dq.peekFirst().isTTLExpire()) {
LOG.info ("Batch " + dq.peekFirst().getId() + " is expired");
LOG.debug ("Batch " + dq.peekFirst().getId() + " is expired");
BytesBoundedBatch candidate = dq.poll();
SequentialBasedBatchAccumulator.this.notFull.signal();
return candidate;
Expand Down Expand Up @@ -240,12 +261,16 @@ public void close() {
public void flush() {
try {
ArrayList<Batch> batches = this.incomplete.all();
LOG.info ("flush on {} batches", batches.size());
int numOutstandingRecords = 0;
for (Batch batch: batches) {
numOutstandingRecords += batch.getRecords().size();
}
LOG.debug ("Flush called on {} batches with {} records total", batches.size(), numOutstandingRecords);
for (Batch batch: batches) {
batch.await();
}
} catch (Exception e) {
LOG.info ("Error happens when flushing");
LOG.error ("Error happened while flushing batches");
}
}

Expand Down
1 change: 1 addition & 0 deletions gobblin-distribution/gobblin-flavor-standard.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ dependencies {
compile project(':gobblin-modules:gobblin-crypto-provider')
compile project(':gobblin-modules:gobblin-kafka-08')
compile project(':gobblin-modules:google-ingestion')
compile project(':gobblin-modules:gobblin-elasticsearch')
}
64 changes: 64 additions & 0 deletions gobblin-example/src/main/resources/wikipedia-elastic.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A sample pull file that copies an input Kafka topic and produces to an output Kafka topic
# with sampling
job {
name=PullFromWikipediaToElasticSearch
group=Wikipedia
description=Pull from Wikipedia and write to ElasticSearch
}

task.maxretries=0

source {
class=org.apache.gobblin.example.wikipedia.WikipediaSource
page.titles="Wikipedia:Sandbox"
revisions.cnt=5
}

wikipedia {
api.rooturl="https://en.wikipedia.org/w/api.php"
avro.schema="{\"namespace\": \"example.wikipedia.avro\",\"type\": \"record\",\"name\": \"WikipediaArticle\",\"fields\": [{\"name\": \"revid\", \"type\": [\"double\", \"null\"]},{\"name\": \"pageid\", \"type\": [\"double\", \"null\"]},{\"name\": \"title\", \"type\": [\"string\", \"null\"]},{\"name\": \"user\", \"type\": [\"string\", \"null\"]},{\"name\": \"anon\", \"type\": [\"string\", \"null\"]},{\"name\": \"userid\", \"type\": [\"double\", \"null\"]},{\"name\": \"timestamp\", \"type\": [\"string\", \"null\"]},{\"name\": \"size\", \"type\": [\"double\", \"null\"]},{\"name\": \"contentformat\", \"type\": [\"string\", \"null\"]},{\"name\": \"contentmodel\", \"type\": [\"string\", \"null\"]},{\"name\": \"content\", \"type\": [\"string\", \"null\"]}]}"
}
converter.classes=org.apache.gobblin.example.wikipedia.WikipediaConverter
extract.namespace=org.apache.gobblin.example.wikipedia

writer {
builder.class=org.apache.gobblin.elasticsearch.writer.ElasticsearchDataWriterBuilder
elasticsearch {
client.type=REST
index.name=wikipedia-test
index.type=docs
#hosts=hostname
#ssl {
# enabled=true
# keystoreType=pkcs12
# keystorePassword=change_me
# keystoreLocation=/path/to/.p12 file
# truststoreType=jks
# truststoreLocation=/path/to/cacerts
# truststorePassword=changeme
#}
typeMapperClass=org.apache.gobblin.elasticsearch.typemapping.AvroGenericRecordTypeMapper
useIdFromData=false # change to true if you want to use a field from the record as the id field
#idFieldName=id # change to the field of the record that you want to use as the id of the document
}
}

data.publisher.type=org.apache.gobblin.publisher.NoopPublisher

0 comments on commit 1182be1

Please sign in to comment.