Skip to content

Commit

Permalink
change following sijie's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazhai committed Nov 5, 2018
1 parent 4fc4ea0 commit d7aec0b
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 79 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.SerDe;
Expand Down Expand Up @@ -102,6 +103,8 @@ private static SchemaType getDefaultSchemaType(Class<?> clazz) {
return SchemaType.STRING;
} else if (isProtobufClass(clazz)) {
return SchemaType.PROTOBUF;
} else if (KeyValue.class.equals(clazz)) {
return SchemaType.KEY_VALUE;
} else {
return DEFAULT_SCHEMA_TYPE;
}
Expand All @@ -126,6 +129,9 @@ private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type)
case JSON:
return JSONSchema.of(clazz);

case KEY_VALUE:
return (Schema<T>)Schema.KV_BYTES;

case PROTOBUF:
return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap());

Expand Down
Expand Up @@ -21,12 +21,16 @@
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -64,7 +68,8 @@ public class KafkaConnectSource implements Source<KeyValue<byte[], byte[]>> {
private OffsetStorageReader offsetReader;
@Getter
private OffsetStorageWriter offsetWriter;
private IdentityHashMap<SourceRecord, SourceRecord> outstandingRecords = new IdentityHashMap<>();
// number of outstandingRecords that have been polled but not been acked
private AtomicInteger outstandingRecords = new AtomicInteger(0);

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
Expand All @@ -81,7 +86,6 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
.getDeclaredConstructor()
.newInstance();


// initialize the key and value converter
keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)))
.asSubclass(Converter.class)
Expand Down Expand Up @@ -119,114 +123,146 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
}

@Override
public Record<KeyValue<byte[], byte[]>> read() throws Exception {
public synchronized Record<KeyValue<byte[], byte[]>> read() throws Exception {
while (true) {
if (currentBatch == null) {
flushFuture = new CompletableFuture<>();
List<SourceRecord> recordList = sourceTask.poll();
if (recordList == null) {
if (recordList == null || recordList.isEmpty()) {
Thread.sleep(1000);
continue;
}
outstandingRecords.addAndGet(recordList.size());
currentBatch = recordList.iterator();
}
if (currentBatch.hasNext()) {
return processSourceRecord(currentBatch.next());
} else {
boolean hasOutstandingRecords;
synchronized (this) {
hasOutstandingRecords = !outstandingRecords.isEmpty();
}
if (!hasOutstandingRecords) {
// there is no records any more, then waiting for the batch to complete writing
// to sink and the offsets are committed as well
flushFuture.get();
flushFuture = null;
}
// there is no records any more, then waiting for the batch to complete writing
// to sink and the offsets are committed as well, then do next round read.
flushFuture.get();
flushFuture = null;
currentBatch = null;
}
}
}

@Override
public void close() {
sourceTask.stop();
}

private synchronized Record<KeyValue<byte[], byte[]>> processSourceRecord(final SourceRecord srcRecord) {
outstandingRecords.put(srcRecord, srcRecord);
KafkaSourceRecord record = new KafkaSourceRecord(srcRecord);
offsetWriter.offset(srcRecord.sourcePartition(), srcRecord.sourceOffset());
return new Record<KeyValue<byte[], byte[]>>() {
@Override
public Optional<String> getKey() {
byte[] keyBytes = keyConverter.fromConnectData(
srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
return Optional.of(Base64.getEncoder().encodeToString(keyBytes));
}
return record;
}

@Override
public KeyValue<byte[], byte[]> getValue() {
byte[] keyBytes = keyConverter.fromConnectData(
srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
byte[] valueBytes = valueConverter.fromConnectData(
srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
return new KeyValue(keyBytes, valueBytes);
}
private static Map<String, String> PROPERTIES = Collections.emptyMap();
private static Optional<Long> RECORD_SEQUENCE = Optional.empty();
private static long FLUSH_TIMEOUT_MS = 2000;

@Override
public Optional<String> getTopicName() {
return Optional.of(srcRecord.topic());
}
private class KafkaSourceRecord implements Record<KeyValue<byte[], byte[]>> {
@Getter
Optional<String> key;
@Getter
KeyValue<byte[], byte[]> value;
@Getter
Optional<String> topicName;
@Getter
Optional<Long> eventTime;
@Getter
Optional<String> partitionId;
@Getter
Optional<String> destinationTopic;

@Override
public Optional<Long> getEventTime() {
return Optional.ofNullable(srcRecord.timestamp());
KafkaSourceRecord(SourceRecord srcRecord) {
byte[] keyBytes = keyConverter.fromConnectData(
srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
byte[] valueBytes = valueConverter.fromConnectData(
srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
if (keyBytes != null) {
this.key = Optional.of(Base64.getEncoder().encodeToString(keyBytes));
}
this.value = new KeyValue(keyBytes, valueBytes);

@Override
public Optional<String> getPartitionId() {
String partitionId = srcRecord.sourcePartition()
.entrySet()
.stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(","));
return Optional.of(partitionId);
}
this.topicName = Optional.of(srcRecord.topic());
this.eventTime = Optional.ofNullable(srcRecord.timestamp());
this.partitionId = Optional.of(srcRecord.sourcePartition()
.entrySet()
.stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(",")));
this.destinationTopic = Optional.of(srcRecord.topic());
}

@Override
public Optional<Long> getRecordSequence() {
return Optional.empty();
}
@Override
public Optional<Long> getRecordSequence() {
return RECORD_SEQUENCE;
}

@Override
public Map<String, String> getProperties() {
return Collections.emptyMap();
@Override
public Map<String, String> getProperties() {
return PROPERTIES;
}

private void completedFlushOffset(Throwable error, Void result) {
if (error != null) {
log.error("Failed to flush offsets to storage: ", error);
currentBatch = null;
offsetWriter.cancelFlush();
flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
} else {
log.trace("Finished flushing offsets to storage");
currentBatch = null;
flushFuture.complete(null);
}
}

@Override
public void ack() {
// TODO: should flush for each batch. not wait for a time for acked all.
// How to handle order between each batch. QueueList<pair<batch, automicInt>>. check if head is all acked.
boolean canFlush = (outstandingRecords.decrementAndGet() == 0);

@Override
public void ack() {
boolean canComplete;
synchronized (KafkaConnectSource.this) {
outstandingRecords.remove(srcRecord);
canComplete = outstandingRecords.isEmpty();
// consumed all the records, flush the offsets
if (canFlush && flushFuture != null) {
if (!offsetWriter.beginFlush()) {
log.error("When beginFlush, No offsets to commit!");
flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush"));
return;
}
if (canComplete && flushFuture != null) {
flushFuture.complete(null);

Future<Void> doFlush = offsetWriter.doFlush(this::completedFlushOffset);
if (doFlush == null) {
// Offsets added in processSourceRecord, But here no offsets to commit
log.error("No offsets to commit!");
flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
return;
}
}

@Override
public void fail() {
if (flushFuture != null) {
flushFuture.completeExceptionally(new Exception("Sink Error"));
// Wait until the offsets are flushed
try {
doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceTask.commit();
} catch (InterruptedException e) {
log.warn("Flush of {} offsets interrupted, cancelling", this);
offsetWriter.cancelFlush();
} catch (ExecutionException e) {
log.error("Flush of {} offsets threw an unexpected exception: ", this, e);
offsetWriter.cancelFlush();
} catch (TimeoutException e) {
log.error("Timed out waiting to flush {} offsets to storage", this);
offsetWriter.cancelFlush();
}
}
}

@Override
public Optional<String> getDestinationTopic() {
return Optional.of(srcRecord.topic());
@Override
public void fail() {
if (flushFuture != null) {
flushFuture.completeExceptionally(new Exception("Sink Error"));
}
};
}


@Override
public void close() throws Exception {
sourceTask.stop();
}
}
}
Expand Up @@ -99,8 +99,6 @@ public void testOpenAndRead() throws Exception {
os.flush();
log.info("write 2 lines.");

// offset of second line
long offset = tempFile.getTotalSpace();
String line2 = "This is the second line\n";
os.write(line2.getBytes());
os.flush();
Expand All @@ -112,10 +110,25 @@ public void testOpenAndRead() throws Exception {
String readBack1 = new String(record.getValue().getValue());
assertTrue(line1.contains(readBack1));
assertEquals(record.getValue().getKey(), null);
log.info("read line1: {}", readBack1);
record.ack();

record = kafkaConnectSource.read();
String readBack2 = new String(record.getValue().getValue());
assertTrue(line2.contains(readBack2));
assertEquals(record.getValue().getKey(), null);
log.info("read line2: {}", readBack2);
record.ack();

String line3 = "This is the 3rd line\n";
os.write(line3.getBytes());
os.flush();

record = kafkaConnectSource.read();
String readBack3 = new String(record.getValue().getValue());
assertTrue(line3.contains(readBack3));
assertEquals(record.getValue().getKey(), null);
log.info("read line3: {}", readBack3);
record.ack();
}
}

0 comments on commit d7aec0b

Please sign in to comment.