Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar SQL Support Avro Schema ByteBuffer Type #6925

Merged

Conversation

gaoran10
Copy link
Contributor

@gaoran10 gaoran10 commented May 8, 2020

Fixes #6749

Motivation

Currently, the Pulsar SQL couldn't support AvroSchema use the ByteBuffer as the field type. For example, use the POJO class as below.

@Data
public static class LogFile {
    int id;
    String name;
    ByteBuffer data;
}

Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();

Error Log

2020-05-08T23:34:47.079+0800	ERROR	SplitRunner-5-101	com.facebook.presto.execution.executor.TaskExecutor	Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"default":null},{"name":"data","type":["null","bytes"],"default":null}]}', schemaType=AVRO, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=3359, endPositionLedgerId=3359, schemaInfoProperties={"__alwaysAllowNull":"true"}} (start = 4.34095226272178E8, wall = 546 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
	at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
	at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Modifications

When the presto field record type is VarbinaryType.VARBINARY, check the record type is ByteBuffer, byte[], ByteBuf or others, and to process the field record by the type.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

Added unit test for getting bytes from VarbinaryType.VARBINARY type field record.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@codelipenghui codelipenghui added this to the 2.6.0 milestone May 9, 2020
@codelipenghui codelipenghui added the area/sql Pulsar SQL related features label May 9, 2020
@gaoran10
Copy link
Contributor Author

gaoran10 commented May 9, 2020

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@jiazhai
Copy link
Member

jiazhai commented May 10, 2020

/pulsarbot run-failure-checks

3 similar comments
@jiazhai
Copy link
Member

jiazhai commented May 10, 2020

/pulsarbot run-failure-checks

@gaoran10
Copy link
Contributor Author

/pulsarbot run-failure-checks

@jiazhai
Copy link
Member

jiazhai commented May 10, 2020

/pulsarbot run-failure-checks

@jiazhai jiazhai merged commit 3aaed24 into apache:master May 11, 2020
jiazhai pushed a commit that referenced this pull request May 12, 2020
Fixes #6749

### Motivation

Currently, the Pulsar SQL couldn't support AvroSchema use the `ByteBuffer` as the field type. For example, use the POJO class as below.

```
@DaTa
public static class LogFile {
    int id;
    String name;
    ByteBuffer data;
}

Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();
```

Error Log
```
2020-05-08T23:34:47.079+0800	ERROR	SplitRunner-5-101	com.facebook.presto.execution.executor.TaskExecutor	Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"default":null},{"name":"data","type":["null","bytes"],"default":null}]}', schemaType=AVRO, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=3359, endPositionLedgerId=3359, schemaInfoProperties={"__alwaysAllowNull":"true"}} (start = 4.34095226272178E8, wall = 546 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
	at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
	at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Modifications

When the presto field record type is `VarbinaryType.VARBINARY`, check the record type is `ByteBuffer`, `byte[]`, `ByteBuf` or others, and to process the field record by the type.

* pulsar sql support avro schema `ByteBuffer` type

* add ByteBuf check and unit tests.
(cherry picked from commit 3aaed24)
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request May 27, 2020
Fixes apache#6749

### Motivation

Currently, the Pulsar SQL couldn't support AvroSchema use the `ByteBuffer` as the field type. For example, use the POJO class as below.

```
@DaTa
public static class LogFile {
    int id;
    String name;
    ByteBuffer data;
}

Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();
```

Error Log
```
2020-05-08T23:34:47.079+0800	ERROR	SplitRunner-5-101	com.facebook.presto.execution.executor.TaskExecutor	Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"default":null},{"name":"data","type":["null","bytes"],"default":null}]}', schemaType=AVRO, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=3359, endPositionLedgerId=3359, schemaInfoProperties={"__alwaysAllowNull":"true"}} (start = 4.34095226272178E8, wall = 546 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
	at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
	at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Modifications

When the presto field record type is `VarbinaryType.VARBINARY`, check the record type is `ByteBuffer`, `byte[]`, `ByteBuf` or others, and to process the field record by the type.

* pulsar sql support avro schema `ByteBuffer` type

* add ByteBuf check and unit tests.
addisonj pushed a commit to instructure/pulsar that referenced this pull request Jun 12, 2020
Fixes apache#6749

### Motivation

Currently, the Pulsar SQL couldn't support AvroSchema use the `ByteBuffer` as the field type. For example, use the POJO class as below.

```
@DaTa
public static class LogFile {
    int id;
    String name;
    ByteBuffer data;
}

Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();
```

Error Log
```
2020-05-08T23:34:47.079+0800	ERROR	SplitRunner-5-101	com.facebook.presto.execution.executor.TaskExecutor	Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"default":null},{"name":"data","type":["null","bytes"],"default":null}]}', schemaType=AVRO, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=3359, endPositionLedgerId=3359, schemaInfoProperties={"__alwaysAllowNull":"true"}} (start = 4.34095226272178E8, wall = 546 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
	at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
	at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Modifications

When the presto field record type is `VarbinaryType.VARBINARY`, check the record type is `ByteBuffer`, `byte[]`, `ByteBuf` or others, and to process the field record by the type.

* pulsar sql support avro schema `ByteBuffer` type

* add ByteBuf check and unit tests.
(cherry picked from commit 3aaed24)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
Fixes apache#6749

### Motivation

Currently, the Pulsar SQL couldn't support AvroSchema use the `ByteBuffer` as the field type. For example, use the POJO class as below.

```
@DaTa
public static class LogFile {
    int id;
    String name;
    ByteBuffer data;
}

Producer<LogFile> producer = pulsarClient.newProducer(Schema.AVRO(LogFile.class)).topic(topic).create();
```

Error Log
```
2020-05-08T23:34:47.079+0800	ERROR	SplitRunner-5-101	com.facebook.presto.execution.executor.TaskExecutor	Error processing Split 20200508_153445_00006_nxngm.1.0-1 PulsarSplit{splitId=1, connectorId='pulsar', originSchemaName='bytes-sql-test4', schemaName='public/default', tableName='bytes-sql-test4', splitSize=4, schema='{"type":"record","name":"LogFile","namespace":"com.ran.schema.KeyValueSchemaTest$","fields":[{"name":"id","type":"int"},{"name":"name","type":["null","string"],"default":null},{"name":"data","type":["null","bytes"],"default":null}]}', schemaType=AVRO, startPositionEntryId=5, endPositionEntryId=9, startPositionLedgerId=3359, endPositionLedgerId=3359, schemaInfoProperties={"__alwaysAllowNull":"true"}} (start = 4.34095226272178E8, wall = 546 ms, cpu = 0 ms, wait = 0 ms, calls = 1)
java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to [B
	at org.apache.pulsar.sql.presto.PulsarRecordCursor.getSlice(PulsarRecordCursor.java:516)
	at com.facebook.presto.spi.RecordPageSource.getNextPage(RecordPageSource.java:117)
	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:242)
	at com.facebook.presto.operator.Driver.processInternal(Driver.java:373)
	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:282)
	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:672)
	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:973)
	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:477)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### Modifications

When the presto field record type is `VarbinaryType.VARBINARY`, check the record type is `ByteBuffer`, `byte[]`, `ByteBuf` or others, and to process the field record by the type.

* pulsar sql support avro schema `ByteBuffer` type

* add ByteBuf check and unit tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sql Pulsar SQL related features release/2.5.2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cannot be cast to [B - presto connector
4 participants