Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar SQL] Fix Pulsar SQL query bytes schema data error #9631

Merged
merged 8 commits into from
Feb 25, 2021

Conversation

gaoran10
Copy link
Contributor

@gaoran10 gaoran10 commented Feb 19, 2021

Motivation

Currently, the Pulsar SQL query bytes schema data will cause an error.

Reproduce

  1. produce bytes schema data.
  2. query data by the Pulsar SQL.
  3. the error log could be seen.

Error log

com.google.common.util.concurrent.UncheckedExecutionException: java.nio.BufferUnderflowException
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
	at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.getSchemaByVersion(PulsarSqlSchemaInfoProvider.java:76)
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.advanceNextPosition(PulsarRecordCursor.java:471)
	at io.prestosql.spi.connector.RecordPageSource.getNextPage(RecordPageSource.java:90)
	at io.prestosql.operator.TableScanOperator.getOutput(TableScanOperator.java:302)
	at io.prestosql.operator.Driver.processInternal(Driver.java:379)
	at io.prestosql.operator.Driver.lambda$processFor$8(Driver.java:283)
	at io.prestosql.operator.Driver.tryWithLock(Driver.java:675)
	at io.prestosql.operator.Driver.processFor(Driver.java:276)
	at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1075)
	at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
	at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
	at io.prestosql.$gen.Presto_332__testversion____20210219_094906_2.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.BufferUnderflowException
	at java.nio.Buffer.nextGetIndex(Buffer.java:509)
	at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:415)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.loadSchema(PulsarSqlSchemaInfoProvider.java:106)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider.access$000(PulsarSqlSchemaInfoProvider.java:49)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:61)
	at org.apache.pulsar.sql.presto.PulsarSqlSchemaInfoProvider$1.load(PulsarSqlSchemaInfoProvider.java:58)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
	... 18 more

Modifications

Add check for bytes schema, if the schema is bytes schema use the schema info of the bytes schema directly.

Verifying this change

Add a new integration test for different schemas.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

@gaoran10
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@gaoran10
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui codelipenghui added this to the 2.8.0 milestone Feb 21, 2021
@codelipenghui codelipenghui added area/sql Pulsar SQL related features type/bug The PR fixed a bug or issue reported a bug labels Feb 21, 2021
@@ -441,9 +442,11 @@ public boolean advanceNextPosition() {
//start time for deseralizing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();

SchemaInfo schemaInfo;
SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the current getSchemaVersion in the RawMessageImpl return null

@Override
    public byte[] getSchemaVersion() {
        if (msgMetadata != null && msgMetadata.getMetadata().hasSchemaVersion()) {
            return msgMetadata.getMetadata().getSchemaVersion();
        } else {
            return null;
        }
    }

It should not throw java.nio.BufferUnderflowException. Could you please confirm here?

Copy link
Contributor Author

@gaoran10 gaoran10 Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method may cause the exception java.nio.BufferUnderflowException.

private SchemaInfo loadSchema(BytesSchemaVersion bytesSchemaVersion) throws PulsarAdminException {
    ClassLoader originalContextLoader = Thread.currentThread().getContextClassLoader();
    try {
        Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
        return pulsarAdmin.schemas()
                .getSchemaInfo(topicName.toString(), ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
    } finally {
        Thread.currentThread().setContextClassLoader(originalContextLoader);
    }
}

The schema version of Schema.BYTES is an empty bytes array. So ByteBuffer.wrap(bytesSchemaVersion.get()).getLong() will cause the exception java.nio.BufferUnderflowException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

if (!schemaType.equals(SchemaType.BYTES) && !schemaType.equals(SchemaType.NONE)) {
return null;
}
if (schemaName.equals(Schema.BYTES.getSchemaInfo().getName())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you compare SchemaType instead of comparing schemaName?

Copy link
Contributor Author

@gaoran10 gaoran10 Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema types of the Schema.BYTES and Schema.BYTEBUFFER all are SchemaType.BYTES, so I use the schema name to make the comparison.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Make sense.

@zymap
Copy link
Member

zymap commented Feb 23, 2021

/pulsarbot run-failure-checks

@gaoran10
Copy link
Contributor Author

/pulsarbot run-failure-checks

@gaoran10
Copy link
Contributor Author

/pulsarbot run-failure-checks

@zymap zymap merged commit 371b311 into apache:master Feb 25, 2021
@gaoran10 gaoran10 deleted the pulsar-sql-bytes-schema-fix branch February 25, 2021 12:47
@zymap
Copy link
Member

zymap commented Mar 1, 2021

This issue is introduced by #8422. So it not a bug fix for the 2.7.0. Removed the release/2.7.1 label.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sql Pulsar SQL related features type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants