Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar SQL supports pulsar's primitive schema #4728

Merged
merged 6 commits into from
Jul 24, 2019
Merged

Pulsar SQL supports pulsar's primitive schema #4728

merged 6 commits into from
Jul 24, 2019

Conversation

congbobo184
Copy link
Contributor

@congbobo184 congbobo184 commented Jul 15, 2019

Motivation

Continue the PR of #4151

Verifying this change

Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)

Documentation

Does this pull request introduce a new feature? (yes / no)
If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
If a feature is not applicable for documentation, explain why?
If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@sijie sijie added this to the 2.5.0 milestone Jul 16, 2019
@sijie sijie added component/schemaregistry area/sql Pulsar SQL related features type/feature The PR added a new feature or issue requested a new feature labels Jul 16, 2019
@sijie sijie modified the milestones: 2.5.0, 2.4.1 Jul 16, 2019
@@ -140,9 +140,10 @@ private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit puls
this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
this.pulsarConnectorConfig = pulsarConnectorConfig;

Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());

this.schemaHandler = getSchemaHandler(schema, pulsarSplit.getSchemaType(), columnHandles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete method getSchemaHandler since it got refactored to a separate class


@Override
public Object deserialize(ByteBuf byteBuf) {
byte[] data = ByteBufUtil.getBytes(byteBuf);
Copy link
Contributor

@jerrypeng jerrypeng Jul 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend not allocating a new byte array every time we deserialize. This could heavily degrade performance. We should reuse pre-allocated buffers. Please take a look at what is done in the JSONSchemaHandler:

https://github.com/apache/pulsar/blob/master/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java#L58

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend not allocating a new byte array every time we deserialize. This could heavily degrade performance. We should reuse pre-allocated buffers. Please take a look at what is done in the JSONSchemaHandler:

https://github.com/apache/pulsar/blob/master/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java#L58

Pulsar primitive schema decode data by byte[] and can't use byte[] size. So we need to new a byte[], unless modify the method of primitive schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulsar primitive schema decode data by byte[] and can't use byte[] size. So we need to new a byte[], unless modify the method of primitive schema

I don't quite follow. Why can't you just do something like this

int size = payload.readableBytes();
        byte[] buffer = tmpBuffer.get();
        if (buffer.length < size) {
            // If the thread-local buffer is not big enough, replace it with
            // a bigger one
            buffer = new byte[size * 2];
            tmpBuffer.set(buffer);
        }

        payload.readBytes(buffer, 0, size);

       schema.decode(data);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulsar primitive schema decode data by byte[] and can't use byte[] size. So we need to new a byte[], unless modify the method of primitive schema

I don't quite follow. Why can't you just do something like this

int size = payload.readableBytes();
        byte[] buffer = tmpBuffer.get();
        if (buffer.length < size) {
            // If the thread-local buffer is not big enough, replace it with
            // a bigger one
            buffer = new byte[size * 2];
            tmpBuffer.set(buffer);
        }

        payload.readBytes(buffer, 0, size);

       schema.decode(data);

For example, StringSchema decode method :

    public String decode(byte[] bytes) {
        if (null == bytes) {
            return null;
        } else {
            return new String(bytes, charset);
        }
    }

so we need to overload the decode method to allow the size of the readable byte [] to be passed in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think you can just use byteBuf.array(). I looked at the code and it seems to just return the underlying byte array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think you can just use byteBuf.array(). I looked at the code and it seems to just return the underlying byte array.

but not all ByteBuf implementation classes implement this method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general we should just improve our schema interface to take ByteBuf as the bytes array. we shouldn't do this profiling separately in presto plugin. this is against the purpose of moving presto to use Schema interface as I started this change.

I would suggest creating a follow up issue to add methods in Schema to support deserializing from ByteBuf and we address the problem in Schema implementations, and then change the code here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie can you create an follow on issue then?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return VarbinaryType.VARBINARY;
case STRING:
return VarcharType.VARCHAR;
case DATE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the corresponding Presto time data types here:

https://prestodb.github.io/docs/current/language/types.html

Copy link
Contributor

@jerrypeng jerrypeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good! Just a few comments. @congbobo184 I would also recommend you profile the code (primitive schemas code path) via YourKit or any other java profiler to make sure unnecessary objects are not being allocated especially on the critical path.

@Override
public Object deserialize(ByteBuf byteBuf) {
byte[] data = ByteBufUtil.getBytes(byteBuf);
return schema.decode(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For time based types, i.e. Date, Time, and Timestamp, there is not really any point to deserialize them to POJOs as that creates additional objects that need to be allocated and the GCed. For those types we can simply just return the a long which is pretty much already done in those Schemas anyways and also below in the extractField method.

@congbobo184
Copy link
Contributor Author

Generally looks good! Just a few comments. @congbobo184 I would also recommend you profile the code (primitive schemas code path) via YourKit or any other java profiler to make sure unnecessary objects are not being allocated especially on the critical path.

thanks for reviewing my PR, I will carefully think about the comment you left behind and change my code.

@codelipenghui
Copy link
Contributor

run Integration Tests
run java8 tests

public Object deserialize(ByteBuf byteBuf) {
byte[] data = ByteBufUtil.getBytes(byteBuf);
Object currentRecord = schema.decode(data);
switch (schemaInfo.getType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for these date, time, and timestamp types, I think we can just decode with the LongSchema. For example there is no point in deserializing to a "Date" object just to get the long value via "getTime()"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in principle I would suggest not shortcutting the schema implementation. it would make maintenance become very hard. because if we can anything in schema implementation, we might forget updating the part in presto. For example, if we change the implementation in Date schema to include tz information. In presto we are using LongSchema which bypass the whole backward compatibility logic, which will cause problems.

I would actually encourage sticking to using Schema and let Schema take care of all the bc handling. If this part become a bottleneck, we can always seek an approach to improve it. Correctness and maintenance is the top priority for pulsar-presto at this moment.

Copy link
Contributor

@jerrypeng jerrypeng Jul 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie schema in Pulsar is created for serializing from a POJO to bytes and vice versa. This is a different scenario then what happens in the presto connector especially here. What is different about presto connector is that we need to map types to presto types. Which is different goal then what schema is designed for.

For example, if we change the implementation in Date schema to include tz information

We will have to change the logic here regardless if we decided to add tz info. The current logic in this PR still returns a long (timestamp) at the end of the day which will not contain any TZ info. It does by creating unnecessary object like "Date".

We can leave the logic as it is now, but lets create some issues involving improving performance of this and schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if we are intending to use only existing schemas for deserializing data in the presto connector, I would suggest optimizing the schemas first for use cases in the presto connector e.g. adding support for ByteBuf, use DSL JSON, etc, before we replace current deserializing code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerrypeng

This is a different scenario then what happens in the presto connector especially here. What is different about presto connector is that we need to map types to presto types. Which is different goal then what schema is designed for.

How is that different? The schema is for defining the types stored in Pulsar. It is the source of truth for interpreting the Pulsar types. Because it handles schema versioning and schema evaluation. It is not just simply deserialization and serialization.

If Presto wants to deserialize and serialize the raw bytes, it can define its way to serialize and deserialize the raw bytes. But if it is using the schema type defined in Pulsar, it should sticky to Pulsar schema. Because Pulsar schema handles versioning and evaluation. It is technically wrong to bypass it.

We will have to change the logic here regardless if we decided to add tz info. The current logic in this PR still returns a long (timestamp) at the end of the day which will not contain any TZ info. It does by creating unnecessary object like "Date".

Yes. That's called schema evaluation. That's why we have added versioning and many other features to handle schema evaluation. And that's why I am so strong on using Schema interface and implementation. You can't bypass it. At the time you bypass the schema interface and implementation, you are bypassing schema versioning and schema evaluation. When a schema evolved, you have to re-implement the logic in presto. That's is going to be a disaster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if we are intending to use only existing schemas for deserializing data in the presto connector, I would suggest optimizing the schemas first for use cases in the presto connector e.g. adding support for ByteBuf, use DSL JSON, etc, before we replace current deserializing code.

@jerrypeng this PR is NOT replacing any deserializing code. It doesn't touch AVRO and JSON. This PR is adding the support for new types. It doesn't change any existing behaviors. I have created a follow up issue to optimize the date types.

We can optimize schemas first before changing AVRO and JSON to use schema implementation. But it doesn't make any sense to block this PR.

Copy link
Contributor

@jerrypeng jerrypeng Jul 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerrypeng this PR is NOT replacing any deserializing code. It doesn't touch AVRO and JSON. This PR is adding the support for new types. It doesn't change any existing behaviors. I have created a follow up issue to optimize the date types.
We can optimize schemas first before changing AVRO and JSON to use schema implementation. But it doesn't make any sense to block this PR.

My comment is about the future work, NOT about this PR. Sorry if that wasn't clear. In an ideal situation we should just use schema for deserializing data in presto, but I don't want to sacrifice performance to do so.

I am OK with this PR going in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an ideal situation we should just use schema for deserializing data in presto, but I don't want to sacrifice performance to do so

I think we agreed on this.

@congbobo184
Copy link
Contributor Author

run java8 tests

2 similar comments
@codelipenghui
Copy link
Contributor

run java8 tests

@sijie
Copy link
Member

sijie commented Jul 21, 2019

run java8 tests

@sijie
Copy link
Member

sijie commented Jul 21, 2019

run java8 tests

@sijie
Copy link
Member

sijie commented Jul 21, 2019

@congbobo184 there is a test failure related this change. Can you please take a look?

Error Message
expected [1563706357298] but found [Sun Jul 21 10:52:37 UTC 2019]
Stacktrace
java.lang.AssertionError: expected [1563706357298] but found [Sun Jul 21 10:52:37 UTC 2019]
	at org.testng.Assert.fail(Assert.java:96)
	at org.testng.Assert.failNotEquals(Assert.java:776)
	at org.testng.Assert.assertEqualsImpl(Assert.java:137)
	at org.testng.Assert.assertEquals(Assert.java:118)
	at org.testng.Assert.assertEquals(Assert.java:442)
	at org.apache.pulsar.sql.presto.TestPulsarPrimitiveSchemaHandler.testPulsarPrimitiveSchemaHandler(TestPulsarPrimitiveSchemaHandler.java:120)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:54)
	at org.testng.internal.InvokeMethodRunnable.run(InvokeMethodRunnable.java:44)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

…_pulsar_primitive_schemas

# Conflicts:
#	pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@congbobo184
Copy link
Contributor Author

run java8 tests

1 similar comment
@sijie
Copy link
Member

sijie commented Jul 22, 2019

run java8 tests

@congbobo184
Copy link
Contributor Author

run java8 tests

1 similar comment
@congbobo184
Copy link
Contributor Author

run java8 tests

@sijie sijie merged commit 1ab35b0 into apache:master Jul 24, 2019
easyfan pushed a commit to easyfan/pulsar that referenced this pull request Jul 26, 2019
jiazhai pushed a commit that referenced this pull request Aug 28, 2019
### Motivation
Continue the PR of #4151
(cherry picked from commit 1ab35b0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sql Pulsar SQL related features type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants