Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar SQL] Fix OffloadPolicies json serialization error in Pulsar SQL #9300

Merged

Conversation

gaoran10
Copy link
Contributor

@gaoran10 gaoran10 commented Jan 24, 2021

Motivation

  1. The class OffloadPolicies couldn't be serialized to JSON bytes array in Pulsar SQL.
  2. The PulsarSplitManager can't split data in the tiered storage.

serialization error log

2021-01-23T03:20:15.403Z	ERROR	remote-task-callback-9	io.airlift.concurrent.BoundedExecutor	Task failed
java.lang.IllegalArgumentException: io.prestosql.server.TaskUpdateRequest could not be converted to JSON
	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:214)
	at io.prestosql.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:513)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.pulsar.common.policies.data.OffloadPolicies and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.prestosql.server.TaskUpdateRequest["sources"]->com.google.common.collect.SingletonImmutableList[0]->io.prestosql.execution.TaskSource["splits"]->com.google.common.collect.RegularImmutableSet[0]->io.prestosql.execution.ScheduledSplit["split"]->io.prestosql.metadata.Split["connectorSplit"]->org.apache.pulsar.sql.presto.PulsarSplit["offloadPolicies"])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1277)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:640)
	at io.prestosql.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:115)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:400)
	at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1509)
	at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1215)
	at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1109)
	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:211)
	... 5 more

Modifications

  1. Add JsonProperty annotation for OffloadPolicies config fields.
  2. Set the right LedgerOffloader for the split manager to read data from tiered storage.
  3. Add a new apply-config-from-env-with-prefix.py to set new configs with a specific prefix to config files.

Verifying this change

This change can be verified as follows:

  • org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@sijie
Copy link
Member

sijie commented Jan 25, 2021

/pulsarbot run-failure-checks

@zymap
Copy link
Member

zymap commented Jan 25, 2021

01:19:21.875 [main:org.apache.pulsar.tests.integration.topologies.PulsarCluster@438] INFO  org.apache.pulsar.tests.integration.topologies.PulsarCluster - [startPrestoWorker] set offload env offloadDriver: s3, offloadProperties: {"s3ManagedLedgerOffloadBucket":"pulsar-integtest","s3ManagedLedgerOffloadServiceEndpoint":"http://s3:9090"}
------- Starting test [TestClass name=class org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage].testQueryTieredStorage1([])-------
~~~~~~~~~ SKIPPED -- [TestClass name=class org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage].testQueryTieredStorage1([])-------------- Starting test [TestClass name=class org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage].testQueryTieredStorage2([])-------
Error: ~~ SKIPPED -- [TestClass name=class org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage].testQueryTieredStorage2([])-------[ERROR] Tests run: 9, Failures: 2, Errors: 0, Skipped: 5, Time elapsed: 192.13 s <<< FAILURE! - in TestSuite
Error:  pulsar-test-suite(org.apache.pulsar.tests.integration.presto.TestBasicPresto)  Time elapsed: 19.202 s  <<< FAILURE!
java.lang.NullPointerException
	at org.apache.pulsar.tests.integration.topologies.PulsarCluster.stopPrestoWorker(PulsarCluster.java:405)
	at org.apache.pulsar.tests.integration.presto.TestBasicPresto.teardownPresto(TestBasicPresto.java:69)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.MethodInvocationHelper.invokeMethodConsideringTimeout(MethodInvocationHelper.java:61)
	at org.testng.internal.ConfigInvoker.invokeConfigurationMethod(ConfigInvoker.java:366)
	at org.testng.internal.ConfigInvoker.invokeConfigurations(ConfigInvoker.java:320)
	at org.testng.internal.TestMethodWorker.invokeAfterClassMethods(TestMethodWorker.java:217)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:130)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.testng.TestRunner.privateRun(TestRunner.java:764)
	at org.testng.TestRunner.run(TestRunner.java:585)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
	at org.testng.TestNG.runSuites(TestNG.java:1069)
	at org.testng.TestNG.run(TestNG.java:1037)
	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:283)
	at org.apache.maven.surefire.testng.TestNGXmlTestSuite.execute(TestNGXmlTestSuite.java:75)
	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:120)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

Error:  pulsar-test-suite(org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage)  Time elapsed: 52.488 s  <<< FAILURE!
java.lang.NullPointerException
	at org.apache.pulsar.tests.integration.topologies.PulsarCluster.startPrestoFollowWorkers(PulsarCluster.java:416)
	at org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage.setupPresto(TestPrestoQueryTieredStorage.java:86)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.MethodInvocationHelper.invokeMethodConsideringTimeout(MethodInvocationHelper.java:61)
	at org.testng.internal.ConfigInvoker.invokeConfigurationMethod(ConfigInvoker.java:366)
	at org.testng.internal.ConfigInvoker.invokeConfigurations(ConfigInvoker.java:320)
	at org.testng.internal.TestMethodWorker.invokeBeforeClassMethods(TestMethodWorker.java:176)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:122)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.testng.TestRunner.privateRun(TestRunner.java:764)
	at org.testng.TestRunner.run(TestRunner.java:585)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
	at org.testng.SuiteRunner.run(SuiteRunner.java:286)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
	at org.testng.TestNG.runSuites(TestNG.java:1069)
	at org.testng.TestNG.run(TestNG.java:1037)
	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:283)
	at org.apache.maven.surefire.testng.TestNGXmlTestSuite.execute(TestNGXmlTestSuite.java:75)
	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:120)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

[INFO] 
[INFO] Results:
[INFO] 
Error:  Failures: 
Error:  org.apache.pulsar.tests.integration.presto.TestBasicPresto.pulsar-test-suite(org.apache.pulsar.tests.integration.presto.TestBasicPresto)
[INFO]   Run 1: PASS
[INFO]   Run 2: PASS
Error:    Run 3: TestBasicPresto.teardownPresto:69 » NullPointer
[INFO]   Run 4: PASS
[INFO] 
Error:  org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage.pulsar-test-suite(org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage)
Error:    Run 1: TestPrestoQueryTieredStorage.setupPresto:86 » NullPointer
[INFO]   Run 2: PASS
[INFO]   Run 3: PASS
[INFO]   Run 4: PASS
[INFO]   Run 5: PASS
[INFO] 
[INFO] 
Error:  Tests run: 2, Failures: 2, Errors: 0, Skipped: 0

@gaoran10 The test is failed. Please take a look when you have time.

@jiazhai
Copy link
Member

jiazhai commented Jan 26, 2021

@gaoran10 is working on a fix

@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@gaoran10
Copy link
Contributor Author

/pulsarbot run-failure-checks

1 similar comment
@gaoran10
Copy link
Contributor Author

/pulsarbot run-failure-checks

@sijie sijie merged commit 7dcc2ae into apache:master Jan 28, 2021
codelipenghui pushed a commit that referenced this pull request Jan 30, 2021
…QL (#9300)

1. The class `OffloadPolicies` couldn't be serialized to JSON bytes array in Pulsar SQL.
2. The PulsarSplitManager can't split data in the tiered storage.

serialization error log
```
2021-01-23T03:20:15.403Z	ERROR	remote-task-callback-9	io.airlift.concurrent.BoundedExecutor	Task failed
java.lang.IllegalArgumentException: io.prestosql.server.TaskUpdateRequest could not be converted to JSON
	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:214)
	at io.prestosql.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:513)
	at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.pulsar.common.policies.data.OffloadPolicies and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.prestosql.server.TaskUpdateRequest["sources"]->com.google.common.collect.SingletonImmutableList[0]->io.prestosql.execution.TaskSource["splits"]->com.google.common.collect.RegularImmutableSet[0]->io.prestosql.execution.ScheduledSplit["split"]->io.prestosql.metadata.Split["connectorSplit"]->org.apache.pulsar.sql.presto.PulsarSplit["offloadPolicies"])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1277)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:640)
	at io.prestosql.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:115)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107)
	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
	at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:400)
	at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1509)
	at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1215)
	at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1109)
	at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:211)
	... 5 more
```

1. Add `JsonProperty` annotation for `OffloadPolicies` config fields.
2. Set the right LedgerOffloader for the split manager to read data from tiered storage.
3. Add a new `apply-config-from-env-with-prefix.py` to set new configs with a specific prefix to config files.

This change can be verified as follows:

  - *org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage*

(cherry picked from commit 7dcc2ae)
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Jan 30, 2021
@gaoran10 gaoran10 deleted the fix-pulsar-sql-split-offload-policies-param branch February 4, 2021 15:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/sql Pulsar SQL related features cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants