Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java Client: MessageImpl - ensure that AutoConsumeSchema downloaded the schema before decoding the payload #10248

Merged
merged 52 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
3bbf7aa
Pulsar IO: Allow to develop Sinks that support Schema but without set…
eolivelli Apr 8, 2021
db5270b
Add integration tests
eolivelli Apr 8, 2021
742473e
Fix build error
eolivelli Apr 8, 2021
d5a791f
fix test
eolivelli Apr 8, 2021
115696c
remove file committed by mistake
eolivelli Apr 9, 2021
4f9a25a
Merge branch 'impl/sink-object' into impl/sink-keyvalue
eolivelli Apr 10, 2021
c0dc9f6
Allow Sink<GenericObject> to work with KeyValue payloads
eolivelli Apr 10, 2021
af7f311
fix
eolivelli Apr 10, 2021
5817edc
more fixes
eolivelli Apr 10, 2021
2c50da8
revert
eolivelli Apr 10, 2021
e42c6c6
remove a few CI jobs
eolivelli Apr 10, 2021
6cac24b
dump sink logs and add a fix
eolivelli Apr 10, 2021
a875ccd
fix build issue
eolivelli Apr 10, 2021
2fa99cc
add debug
eolivelli Apr 10, 2021
ee24d89
Remove SEPARATED KV, they need more plumbing
eolivelli Apr 11, 2021
fc8e253
remove logger
eolivelli Apr 11, 2021
ba1a869
Merge branch 'master' into impl/sink-keyvalue
eolivelli Apr 12, 2021
4ef8b1e
Merge branch 'master' into impl/sink-keyvalue
eolivelli Apr 12, 2021
3011572
more complicated test
eolivelli Apr 12, 2021
2c7847b
fix download logs error
eolivelli Apr 12, 2021
59e2638
remove useless tests
eolivelli Apr 12, 2021
bcba2c8
disable some tests
eolivelli Apr 12, 2021
5befc71
Merge branch 'master' into impl/sink-keyvalue
eolivelli Apr 13, 2021
267bb04
Restore CI jobs
eolivelli Apr 13, 2021
3532f8d
restore file
eolivelli Apr 13, 2021
932ff4d
Merge remote-tracking branch 'origin/master' into impl/sink-keyvalue
eolivelli Apr 14, 2021
4d8e28f
Try to fecth schema in getSchemaInfo
eolivelli Apr 14, 2021
342c3f2
Do not launch many sinks in parallel
eolivelli Apr 14, 2021
1e9714f
Merge remote-tracking branch 'origin/master' into impl/sink-keyvalue
eolivelli Apr 15, 2021
22257d4
reduce number of sinks and add better error message
eolivelli Apr 15, 2021
f4a0932
reduce test number
eolivelli Apr 15, 2021
ba7c713
simply and debug CI
eolivelli Apr 15, 2021
4d4dda2
more debug
eolivelli Apr 15, 2021
c732dba
do not copy the full log
eolivelli Apr 15, 2021
640a1fe
Fetch schema in AutoConsumeSchema#getSchemaInfo
eolivelli Apr 16, 2021
37a3dec
Merge remote-tracking branch 'origin/master' into fix/get-schema-stuck
eolivelli Apr 16, 2021
8d46853
remove debug
eolivelli Apr 16, 2021
6664ae5
add debug
eolivelli Apr 16, 2021
78aa4b1
remove useless CI jobs
eolivelli Apr 16, 2021
be30806
Revert debug and fix the problem
eolivelli Apr 16, 2021
499ae85
revert
eolivelli Apr 16, 2021
43becbd
javadoc
eolivelli Apr 16, 2021
44973b8
revert debug
eolivelli Apr 16, 2021
9e7b718
Merge remote-tracking branch 'origin/master' into fix/get-schema-stuck
eolivelli Apr 17, 2021
b0bc40e
add test
eolivelli Apr 17, 2021
bf5543d
fix build
eolivelli Apr 17, 2021
6656fbc
Run only one sink
eolivelli Apr 17, 2021
e7310f7
less records
eolivelli Apr 17, 2021
29c898e
explicit ack
eolivelli Apr 17, 2021
3cee2b6
launch only one sink
eolivelli Apr 18, 2021
de99cae
fix assertion
eolivelli Apr 18, 2021
e86dbb8
fix test
eolivelli Apr 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1707,9 +1707,15 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
if (log.isDebugEnabled()) {
log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
remoteAddress, new String(commandGetSchema.getSchemaVersion()),
commandGetSchema.getTopic(), commandGetSchema.getRequestId());
if (commandGetSchema.hasSchemaVersion()) {
log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you enable debug in the broker you see getSchema() does not work due to an error while calling getSchemaVersion() when hasSchemaVersion() returns false.
I added here the fix

remoteAddress, new String(commandGetSchema.getSchemaVersion()),
commandGetSchema.getTopic(), commandGetSchema.getRequestId());
} else {
log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
remoteAddress, null,
commandGetSchema.getTopic(), commandGetSchema.getRequestId());
}
}

long requestId = commandGetSchema.getRequestId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

public class MessageImpl<T> implements Message<T> {
Expand Down Expand Up @@ -333,9 +334,17 @@ public byte[] getSchemaVersion() {
}
}

private SchemaInfo getSchemaInfo() {
if (schema instanceof AutoConsumeSchema) {
((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
}
return schema.getSchemaInfo();
}

@Override
public T getValue() {
if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
SchemaInfo schemaInfo = getSchemaInfo();
if (schemaInfo != null && SchemaType.KEY_VALUE == schemaInfo.getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
Expand All @@ -31,7 +30,10 @@
import org.apache.pulsar.common.schema.SchemaType;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -78,27 +80,7 @@ public boolean supportSchemaVersioning() {

@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
if (schema == null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just moved this code into a separated method

SchemaInfo schemaInfo = null;
try {
schemaInfo = schemaInfoProvider.getLatestSchema().get();
if (schemaInfo == null) {
// schemaless topic
schemaInfo = BytesSchema.of().getSchemaInfo();
}
} catch (InterruptedException | ExecutionException e ) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.error("Can't get last schema for topic {} use AutoConsumeSchema", topicName);
throw new SchemaSerializationException(e.getCause());
}
// schemaInfo null means that there is no schema attached to the topic.
schema = generateSchema(schemaInfo);
schema.setSchemaInfoProvider(schemaInfoProvider);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
}
fetchSchemaIfNeeded();
ensureSchemaInitialized();
return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
}
Expand Down Expand Up @@ -144,7 +126,7 @@ public Optional<Object> getNativeSchema() {
}
}

private Schema<?> generateSchema(SchemaInfo schemaInfo) {
private static Schema<?> generateSchema(SchemaInfo schemaInfo) {
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
final boolean useProvidedSchemaAsReaderSchema = false;
Expand Down Expand Up @@ -247,4 +229,47 @@ public static GenericRecord wrapPrimitiveObject(Object value, SchemaType type, b
public Schema<?> getInternalSchema() {
return schema;
}

/**
* It may happen that the schema is not loaded but we need it, for instance in order to call getSchemaInfo()
* We cannot call this method in getSchemaInfo, because getSchemaInfo is called in many
* places and we will introduce lots of deadlocks.
*/
public void fetchSchemaIfNeeded() throws SchemaSerializationException {
if (schema == null) {
if (schemaInfoProvider == null) {
throw new SchemaSerializationException("Can't get accurate schema information for topic " + topicName +
"using AutoConsumeSchema because SchemaInfoProvider is not set yet");
} else {
SchemaInfo schemaInfo = null;
try {
schemaInfo = schemaInfoProvider.getLatestSchema().get();
if (schemaInfo == null) {
// schemaless topic
schemaInfo = BytesSchema.of().getSchemaInfo();
}
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
log.error("Can't get last schema for topic {} using AutoConsumeSchema", topicName);
throw new SchemaSerializationException(e.getCause());
}
// schemaInfo null means that there is no schema attached to the topic.
schema = generateSchema(schemaInfo);
schema.setSchemaInfoProvider(schemaInfoProvider);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName, schemaInfo.getSchemaDefinition());
}
}
}

@Override
public String toString() {
if (schema != null && schema.getSchemaInfo() != null) {
return "AUTO_CONSUME(schematype=" + schema.getSchemaInfo().getType() + ")";
} else {
return "AUTO_CONSUME(uninitialized)";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public void write(Record<GenericObject> record) {
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());

record.ack();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
Expand Down Expand Up @@ -89,6 +90,9 @@ public void testGenericObjectSink() throws Exception {
.serviceUrl(container.getPlainTextServiceUrl())
.build();

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build();

// we are not using a parametrized test in order to save resources
// we create N sinks, send the records and verify each sink
// sinks execution happens in parallel
Expand All @@ -97,67 +101,61 @@ public void testGenericObjectSink() throws Exception {
new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.INLINE), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())),
new SinkSpec("test-kv-sink-input-kv-avro-json-sep-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class), KeyValueEncodingType.SEPARATED), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the new test case that covers the change

);
// submit all sinks

final int numRecords = 2;

for (SinkSpec spec : specs) {
submitSinkConnector(spec.sinkName, spec.outputTopicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", JAVAJAR);
}
// check all sinks
for (SinkSpec spec : specs) {

// get sink info
getSinkInfoSuccess(spec.sinkName);
getSinkStatus(spec.sinkName);
}

final int numRecords = 10;

for (SinkSpec spec : specs) {
@Cleanup Producer<Object> producer = client.newProducer(spec.schema)
.topic(spec.outputTopicName)
.create();
for (int i = 0; i < numRecords; i++) {
MessageId messageId = producer.newMessage()
.value(spec.testValue)
.property("expectedType", spec.schema.getSchemaInfo().getType().toString())
.property("recordNumber", i + "")
.send();
log.info("sent message {} {} with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
}
}

// wait that all sinks processed all records without errors
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
for (SinkSpec spec : specs) {
try {
log.info("waiting for sink {}", spec.sinkName);
for (int i = 0; i < 120; i++) {
SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
if (instance.getStatus().numWrittenToSink >= numRecords) {
break;
}
assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
Thread.sleep(1000);
}

// wait that all sinks processed all records without errors

try {
log.info("waiting for sink {}", spec.sinkName);
for (int i = 0; i < 120; i++) {
SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
log.info("sink {} is okay", spec.sinkName);
} finally {
dumpSinkLogs(spec);
SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
if (instance.getStatus().numWrittenToSink >= numRecords) {
break;
}
assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
Thread.sleep(1000);
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
log.info("sink {} is okay", spec.sinkName);
} finally {
dumpSinkLogs(spec);
}

for (SinkSpec spec : specs) {
deleteSink(spec.sinkName);
getSinkInfoNotFound(spec.sinkName);
}
Expand Down