Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql JDBC Sink - consumer error #5454

Closed
jm7647 opened this issue Oct 23, 2019 · 26 comments · Fixed by #5930
Closed

mysql JDBC Sink - consumer error #5454

jm7647 opened this issue Oct 23, 2019 · 26 comments · Fixed by #5930
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@jm7647
Copy link

jm7647 commented Oct 23, 2019

Describe the bug
Running mysql jdbc sink (helped in issue #5418) but unable to receive data from producer.

To Reproduce
Steps to reproduce the behavior:

  1. Define Test schema and send producer data

     class Test(Record):
         id = Integer()
         name = String()
    
     producer = client.create_producer(
                     'pulsar-mysql-jdbc-sink-topic',
                     schema=AvroSchema(Test))
    
  2. Receiving error in SINK logs:

19:25:55.712 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic][public/default/pulsar-mysql-jdbc-sink] Subscribing to topic on cnx [id: 0x79b6a514, L:/135.25.67.33:33184 - R:mtn21cvaas04.mt.att.com/135.25.67.33:6650]
19:25:55.734 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic][public/default/pulsar-mysql-jdbc-sink] Subscribed to topic on mtn21cvaas04.mt.att.com/135.25.67.33:6650 -- consumer: 0

19:29:41.376 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
19:29:41.427 [public/default/pulsar-mysql-jdbc-sink-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/pulsar-mysql-jdbc-sink:0] Uncaught exception in Java Instance
java.lang.NullPointerException: null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:770) ~[com.google.guava-guava-21.0.jar:?]
at com.google.common.cache.LocalCache.get(LocalCache.java:4153) ~[com.google.guava-guava-21.0.jar:?]
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4158) ~[com.google.guava-guava-21.0.jar:?]
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5147) ~[com.google.guava-guava-21.0.jar:?]
at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:94) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:72) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:36) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.client.api.Schema.decode(Schema.java:97) ~[java-instance.jar:?]
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:268) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.4.1.jar:2.4.1]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:463) ~[org.apache.pulsar-pulsar-functions-instance-2.4.1.jar:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:236) [org.apache.pulsar-pulsar-functions-instance-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
19:29:41.504 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
19:29:41.510 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic] [public/default/pulsar-mysql-jdbc-sink] Closed consumer
19:29:41.512 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.io.jdbc.JdbcAbstractSink - Closed jdbc connection: jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink?useUnicode=true&characterEncoding=utf-8&useSSL=false
19:29:41.518 [public/default/pulsar-mysql-jdbc-sink-0] INFO org.apache.pulsar.functions.instance.JavaInstanceRunnable - Unloading JAR files for function InstanceConfig(instanceId=0, functionId=5703beea-ca01-446b-8d1a-35167aeb2cac, functionVersion=df4c3460-d718-4684-9109-3d081cfb6f1e, functionDetails=tenant: "public"
namespace: "default"
name: "pulsar-mysql-jdbc-sink"
className: "org.apache.pulsar.functions.api.utils.IdentityFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "org.apache.pulsar.client.api.schema.GenericRecord"
inputSpecs {
key: "pulsar-mysql-jdbc-sink-topic"
value {
}
}
cleanupSubscription: true
}
sink {
className: "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink"
configs: "{"userName":"root","password":"jdbc","jdbcUrl":"jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink?useUnicode\u003dtrue\u0026characterEncoding\u003dutf-8\u0026useSSL\u003dfalse","tableName":"pulsar_mysql_jdbc_sink"}"
typeClassName: "org.apache.pulsar.client.api.schema.GenericRecord"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: SINK
, maxBufferedTuples=1024, functionAuthenticationSpec=null, port=37644, clusterName=standalone)
19:29:41.518 [main] INFO org.apache.pulsar.functions.runtime.JavaInstanceStarter - RuntimeSpawner quit, shutting down JavaInstance
19:29:41.520 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://mtn21cvaas04.mt.att.com:6650
19:29:41.530 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x79b6a514, L:/135.25.67.33:33184 ! R:mtn21cvaas04.mt.att.com/135.25.67.33:6650]
Disconnected

Expected behavior
Expect to see no error and some record in mysql DB.

Desktop (please complete the following information):

  • OS: ubuntu 16.4

Additional context
It maybe some miss-configuration and not a bug.
SINK is using sample schema provided by pulsar instructions:

{
"type": "AVRO",
"schema": "{"type":"record","name":"Test","fields":[{"name":"id","type":["null","int"]},{"name":"name","type":["null","string"]}]}",
"properties": {}
}

@jm7647 jm7647 added the type/bug The PR fixed a bug or issue reported a bug label Oct 23, 2019
@tuteng
Copy link
Member

tuteng commented Oct 24, 2019

Please show your producer's code.

The following is an example, please refer to

@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Foo3 {
    public String field1;
    public String field2;
    public int field3;
}

       PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();


        AvroSchema<Foo3> schema = AvroSchema.of(SchemaDefinition.<Foo3>builder().withPojo(Foo3.class).withAlwaysAllowNull(false).build());
        Producer<Foo3> producer = client.newProducer(schema)
                .topic("test-jdbc")
                .create();
        for (int i = 0; i < 20; i++) {
            String key = "key-" + i;

            Foo3 obj = new Foo3();
            obj.setField1("field1_insert_" + i);
            obj.setField2("field2_insert_" + i);
            obj.setField3(i);
            Map<String, String> properties = Maps.newHashMap();
            properties.put("EVENT", "INSERT");

            producer.newMessage()
                    .properties(properties)
                    .key(key)
                    .value(obj)
                    .send();
        }

        for (int i = 0; i < 20; i++) {
            String key = "key-" + i;

            Foo3 obj = new Foo3();
            obj.setField1("field1_insert_" + i);
            obj.setField2("field2_update_" + i);
            obj.setField3(i);
            Map<String, String> properties = Maps.newHashMap();
            properties.put("EVENT", "UPDATE");

            producer.newMessage()
                    .properties(properties)
                    .key(key)
                    .value(obj)
                    .send();
        }

        for (int i = 0; i < 20; i++) {
            String key = "key-" + i;

            Foo3 obj = new Foo3();
            obj.setField1("field1_insert_" + i);
            obj.setField2("field2_delete_" + i);
            obj.setField3(i);
            Map<String, String> properties = Maps.newHashMap();
            properties.put("EVENT", "DELETE");

            producer.newMessage()
                    .properties(properties)
                    .key(key)
                    .value(obj)
                    .send();
        }
        producer.close();
        client.close();

@jm7647
Copy link
Author

jm7647 commented Oct 24, 2019

I was hoping you have some python example.
There is nothing I could find on your web side regarding producer for that sink.
All links related to builtin connectors are broken:

https://pulsar.apache.org/docs/en/io-connectors/

@tuteng
Copy link
Member

tuteng commented Oct 24, 2019

http://pulsar.apache.org/docs/en/next/io-connectors/
It is already being fixed. #5347

@jm7647
Copy link
Author

jm7647 commented Oct 24, 2019

This is producer code I used based on your sample - still same error:

    producer = client.create_producer(
                    topic='pulsar-mysql-jdbc-sink-topic',
                    properties={"EVENT": "INSERT"},
                    schema=AvroSchema(Test))

    r = Test(id=1, name="Hello World")
    producer.send(r)

Additional observation:

Above error also makes sink stop running, it attempts to restart in a loop but fails with same error therefor according to status it is not running any more:

bin/pulsar-admin sinks status --tenant public --namespace default --name pulsar-mysql-jdbc-sink

{
"numInstances" : 1,
"numRunning" : 0,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : false,

@sijie
Copy link
Member

sijie commented Oct 27, 2019

@tuteng I suspected the problem from the schema uploaded by python client. Can you try to reproduce the problem using the example provided by @jm7647 ?

@tuteng
Copy link
Member

tuteng commented Oct 29, 2019

Please try use localrun to start and look at broker's log.

bin/pulsar-admin sinks localrun --archive ./connectors/pulsar-io-jdbc-2.4.1.nar --inputs pulsar-mysql-jdbc-sink-topic --name pulsar-mysql-jdbc-sink --sink-config-file ./connectors/pulsar-mysql-jdbc-sink.yaml --parallelism 1

References: http://pulsar.apache.org/docs/en/next/io-debug/#debug-in-localrun-mode
@jm7647

@jm7647
Copy link
Author

jm7647 commented Oct 29, 2019

Tried to do localrun. Below is result from main broker log.
(Notice "Closing consumer: 0" - next "Closing consumer: 1" - it will continue in a loop)

14:25:12.806 [pulsar-io-50-3] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /135.25.67.53:33764
14:25:12.810 [pulsar-io-50-3] INFO org.apache.pulsar.broker.service.ServerCnx - [/135.25.67.53:33764][persistent://public/default/pulsar-mysql-jdbc-sink-topic] Creating producer. producerId=0
14:25:12.817 [ForkJoinPool.commonPool-worker-3] INFO org.apache.pulsar.broker.service.ServerCnx - [/135.25.67.53:33764] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/pulsar-mysql-jdbc-sink-topic}, client=/135.25.67.53:33764, producerName=standalone-8-51, producerId=0}
14:25:12.818 [bookkeeper-ml-workers-OrderedExecutor-0-0] WARN org.apache.bookkeeper.client.BookieWatcherImpl - New ensemble: [127.0.0.1:3181] is not adhering to Placement Policy. quarantinedBookies: []
14:25:12.831 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 25036
14:25:12.831 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/pulsar-mysql-jdbc-sink-topic] Created new ledger 25036
14:25:12.836 [pulsar-io-50-3] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/pulsar-mysql-jdbc-sink-topic}][standalone-8-51] Closing producer on cnx /135.25.67.53:33764
14:25:12.836 [pulsar-io-50-3] INFO org.apache.pulsar.broker.service.ServerCnx - [PersistentTopic{topic=persistent://public/default/pulsar-mysql-jdbc-sink-topic}][standalone-8-51] Closed producer on cnx /135.25.67.53:33764
14:25:12.844 [pulsar-io-50-3] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /135.25.67.53:33764
14:25:12.922 [pulsar-io-50-2] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42854] Closing consumer: 0
14:25:12.922 [pulsar-io-50-2] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/pulsar-mysql-jdbc-sink-topic, name=public/default/pulsar-mysql-jdbc-sink}, consumerId=0, consumerName=d4dfc, address=/127.0.0.1:42854} with pending 1 acks
14:25:12.922 [pulsar-io-50-2] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42854] Closed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/pulsar-mysql-jdbc-sink-topic, name=public/default/pulsar-mysql-jdbc-sink}, consumerId=0, consumerName=d4dfc, address=/127.0.0.1:42854}
14:25:14.810 [pulsar-io-50-2] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42854] Subscribing on topic persistent://public/default/pulsar-mysql-jdbc-sink-topic / public/default/pulsar-mysql-jdbc-sink
14:25:14.813 [ForkJoinPool.commonPool-worker-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/pulsar-mysql-jdbc-sink-topic-public%2Fdefault%2Fpulsar-mysql-jdbc-sink] Rewind from 25036:1 to 25036:0
14:25:14.813 [ForkJoinPool.commonPool-worker-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/pulsar-mysql-jdbc-sink-topic] There are no replicated subscriptions on the topic
14:25:14.813 [ForkJoinPool.commonPool-worker-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/pulsar-mysql-jdbc-sink-topic][public/default/pulsar-mysql-jdbc-sink] Created new subscription for 1
14:25:14.813 [ForkJoinPool.commonPool-worker-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42854] Created subscription on topic persistent://public/default/pulsar-mysql-jdbc-sink-topic / public/default/pulsar-mysql-jdbc-sink
14:25:14.818 [pulsar-io-50-2] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:42854] Closing consumer: 1
14:25:14.818 [pulsar-io-50-2] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/pulsar-mysql-jdbc-sink-topic, name=public/default/pulsar-mysql-jdbc-sink}, consumerId=1, consumerName=03170, address=/127.0.0.1:42854} with pending 1 acks

And here is log from SINK - it is showing same NULL error:

14:22:15.535 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x1643772d, L:/127.0.0.1:42854 - R:localhost/127.0.0.1:6650]] Connected to server
14:22:15.535 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x1643772d, L:/127.0.0.1:42854 - R:localhost/127.0.0.1:6650] Connected through proxy to target broker at mtn21cvaas04.mt.att.com:6650
14:22:15.537 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic][public/default/pulsar-mysql-jdbc-sink] Subscribing to topic on cnx [id: 0x1643772d, L:/127.0.0.1:42854 - R:localhost/127.0.0.1:6650]
14:22:15.567 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic][public/default/pulsar-mysql-jdbc-sink] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0

14:25:12.868 [pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
14:25:12.935 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic] [public/default/pulsar-mysql-jdbc-sink] Closed consumer
14:25:12.943 [main] INFO org.apache.pulsar.functions.LocalRunner - RuntimeSpawner quit because of
java.lang.NullPointerException: null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:770) ~[com.google.guava-guava-21.0.jar:?]
at com.google.common.cache.LocalCache.get(LocalCache.java:4153) ~[com.google.guava-guava-21.0.jar:?]
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4158) ~[com.google.guava-guava-21.0.jar:?]
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5147) ~[com.google.guava-guava-21.0.jar:?]
at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:94) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:72) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:36) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.client.api.Schema.decode(Schema.java:97) ~[org.apache.pulsar-pulsar-client-api-2.4.1.jar:2.4.1]
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:268) ~[org.apache.pulsar-pulsar-client-original-2.4.1.jar:2.4.1]
at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.4.1.jar:2.4.1]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:463) ~[org.apache.pulsar-pulsar-functions-instance-2.4.1.jar:2.4.1]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:236) ~[org.apache.pulsar-pulsar-functions-instance-2.4.1.jar:2.4.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_212]
14:25:14.130 [function-timer-thread-5-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/pulsar-mysql-jdbc-sink-java.lang.NullPointerException Function Container is dead with exception.. restarting
14:25:14.131 [function-timer-thread-5-1] INFO org.apache.pulsar.functions.runtime.ThreadRuntime - ThreadContainer starting function with instance config InstanceConfig(instanceId=0, functionId=ebfdf9d9-c659-4ea0-8ec0-4743afdef987, functionVersion=e1c30948-ad78-4aee-a2cd-f245a77256ed, functionDetails=tenant: "public"
namespace: "default"
name: "pulsar-mysql-jdbc-sink"
className: "org.apache.pulsar.functions.api.utils.IdentityFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "org.apache.pulsar.client.api.schema.GenericRecord"
inputSpecs {
key: "pulsar-mysql-jdbc-sink-topic"
value {
}
}
cleanupSubscription: true
}
sink {
className: "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink"
configs: "{"userName":"root","password":"jdbc","jdbcUrl":"jdbc:mysql://127.0.0.1:3306/pulsar_mysql_jdbc_sink?useSSL\u003dfalse","tableName":"pulsar_mysql_jdbc_sink"}"
typeClassName: "org.apache.pulsar.client.api.schema.GenericRecord"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: SINK
, maxBufferedTuples=1024, functionAuthenticationSpec=null, port=35219, clusterName=local)
Loading class com.mysql.jdbc.Driver'. This is deprecated. The new driver class is com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
14:25:14.797 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Configuring schema for topic pulsar-mysql-jdbc-sink-topic : {
"name": "public/default/pulsar-mysql-jdbc-sink-topic",
"schema": {
"type": "record",
"name": "Test",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "name",
"type": [
"null",
"string"
]
}
]
},
"type": "AVRO",
"properties": {}
}
14:25:14.799 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema for topic pulsar-mysql-jdbc-sink-topic : {"type":"record","name":"Test","fields":[{"name":"id","type":["null","int"]},{"name":"name","type":["null","string"]}]}
14:25:14.806 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer perf with config: {
"topicNames" : [ "pulsar-mysql-jdbc-sink-topic" ],
"topicsPattern" : null,
"subscriptionName" : "public/default/pulsar-mysql-jdbc-sink",
"subscriptionType" : "Shared",
"receiverQueueSize" : 1000,
"acknowledgementsGroupTimeMicros" : 100000,
"negativeAckRedeliveryDelayMicros" : 60000000,
"maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
"consumerName" : null,
"ackTimeoutMillis" : 0,
"tickDurationMillis" : 1000,
"priorityLevel" : 0,
"cryptoFailureAction" : "CONSUME",
"properties" : {
"application" : "pulsar-sink",
"id" : "public/default/pulsar-mysql-jdbc-sink",
"instance_id" : "0"
},
"readCompacted" : false,
"subscriptionInitialPosition" : "Latest",
"patternAutoDiscoveryPeriod" : 1,
"regexSubscriptionMode" : "PersistentOnly",
"deadLetterPolicy" : null,
"autoUpdatePartitions" : true,
"replicateSubscriptionState" : false,
"resetIncludeHead" : false
}
14:25:14.809 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650",
"authPluginClassName" : null,
"authParams" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : null,
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"defaultBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 30000000000
}
14:25:14.810 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic][public/default/pulsar-mysql-jdbc-sink] Subscribing to topic on cnx [id: 0x1643772d, L:/127.0.0.1:42854 - R:localhost/127.0.0.1:6650]
14:25:14.814 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic][public/default/pulsar-mysql-jdbc-sink] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
14:25:14.818 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [pulsar-mysql-jdbc-sink-topic] [public/default/pulsar-mysql-jdbc-sink] Closed consumer

@tuteng
Copy link
Member

tuteng commented Oct 30, 2019

Thank you, I have reproduced this problem locally, and I am looking at how to fix it.
This seems to be a problem caused by python's client's failure to handle multiple versions. If possible, you can use the java client first.

@sijie
Copy link
Member

sijie commented Oct 30, 2019

@tuteng is correct. The problem is that all non-java clients don't attach the schema version to the messages when clients produce them. The fix should be straightforward by attaching the schema version to the messages at c++ client.

To get around this problem for now, please use java client to produce AVRO messages first.

sijie pushed a commit that referenced this issue Jan 1, 2020
Fixes #5454 


### Motivation

The current CPP client cannot correctly obtain the schema version, resulting in an error in parsing with java client when sending data with schema using python client.

Test code:

```
import pulsar
import json

from pulsar.schema import *

class Test(Record):
    name = String()
    id = Integer()

client = pulsar.Client('pulsar://localhost:6650');
producer = client.create_producer('test-producer-schema', schema=AvroSchema(Test))
producer.send(Test(name='Hello', id=1))
client.close()
```



### Modifications

* Add set schema version in msgmetadata

### Verifying this change

Add check schema version in unit test SchemaTest
@nicolo-paganin
Copy link

nicolo-paganin commented Feb 15, 2020

I have the same problem also using pulsar-client v2.5.0 (I have installed it with this command pip install pulsar-client==2.5.0).
I have a Python pulsar function that is copying messages from a topic1 to a topic2.
Topic1 is generated from a Java producer and my sink is correctly handling it.
The second topic created by the python pulsar function is generating that exception.

org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/pulsar-mysql-jdbc-sink:0] Uncaught exception in Java Instance java.lang.NullPointerException: null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:770) ~[com.google.guava-guava-21.0.jar:?] at com.google.common.cache.LocalCache.get(LocalCache.java:4153) ~[com.google.guava-guava-21.0.jar:?] at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4158) ~[com.google.guava-guava-21.0.jar:?] at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5147) ~[com.google.guava-guava-21.0.jar:?] at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:94) ~

@tuteng is the fix solving that problem in your case?

Thanks

@tuteng
Copy link
Member

tuteng commented Feb 15, 2020

Can you describe your environment, container, standalone or cluster, so that I can quickly reproduce this problem? @nicolo-paganin Thanks.

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 16, 2020

@tuteng yes I am running a pulsar standalone version 2.5.0 from binaries (extracting the binaries from the binary release here.
I am running it in MacOS 10.15.3 with python 3.7.3 and pulsar-client 2.5.0 installed with pip globally.

The Python pulsar function is copying the value from one topic with an Avro schema directly to another topic, something like this (I tried to simplify my pulsar function removing implementations that are not needed in the example):

from pulsar import Function, SerDe
from pulsar.schema import *
import fastavro
import io


class SensorSchema(Record):
    value = Double(required=True)
    timestamp = Long(required=True)

class AvroSerDe(SerDe):
    def __init__(self):
        pass

    def serialize(self, object):
        return object

class BaseFunction(Function):
    def __init__(self):
        self.parsed_schema = SensorSchema.schema()  # fastavro.parse_schema(schema)

    def process(self, input, context):
        buffer = io.BytesIO(input)
        d = fastavro.schemaless_reader(buffer, self.parsed_schema)
        newValue = self.customProcess(d, context)
        outbuffer = io.BytesIO()
        fastavro.schemaless_writer(outbuffer, self.parsed_schema, newValue)
        return outbuffer.getvalue()

    def customProcess(self, input, context):
        return {}


class CopyFunction(BaseFunction):
    def customProcess(self, input, context):

        output = {'value': input['value'], 'timestamp': input['timestamp']}
        return output

I am creating the pulsar function using pulsar-admin in this way

pulsar-admin functions create \
  --py copyFunction.py \
  --classname copyFunction.copyFunction --schema-type SensorSchema --output-serde-classname copyFunction.AvroSerDe \
  --tenant public \
  --namespace default \
  --name copyFunction1 \
  --inputs persistent://public/default/topic1 \
  --output persistent://public/default/topic2

and the sink in this way

pulsar-admin sinks create --tenant public --namespace default --name influxdb-test-sink --sink-type influxdb-connector  --sink-config-file influxdb.yaml  --topics-pattern "public/default/.*"

The problem is the following:

  1. the input topic (topic1) is created from a Java producer. At the beginning my pulsar function is not started so my sink is copying to a DB only topic1 and all is working well
  2. I start the pulsar function, it is creating topic2 correctly, also if I am starting a consumer on topic2 I see correctly the values generated by the pulsar function
  3. The sink crashing showing this exception:
20:19:51.224 [public/default/influxdb-test-sink-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/influxdb-test-sink:0] Uncaught exception in Java Instance
java.lang.NullPointerException: null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[java-instance.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3950) ~[java-instance.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-instance.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-instance.jar:?]
	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:98) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:96) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:39) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.api.Schema.decode(Schema.java:95) ~[java-instance.jar:?]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:273) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.impl.TopicMessageImpl.getValue(TopicMessageImpl.java:143) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:2.5.0]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:472) ~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]
20:19:51.230 [public/default/influxdb-test-sink-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable - Closing instance
20:19:51.233 [public/default/influxdb-test-sink-0] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/topic2] [public/default/influxdb-test-sink] Closed consumer
20:19:51.235 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/topic2] [public/default/influxdb-test-sink] Closed consumer
20:19:51.235 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/channel2] [public/default/influxdb-test-sink] Closed consumer
20:19:51.236 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/channel1] [public/default/influxdb-test-sink] Closed consumer
20:19:51.236 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [MultiTopicsConsumer-a98fd] [public/default/influxdb-test-sink] Closed Topics Consumer

I understood that this should be fixed in 2.5.0 by in my case the fix seems to not work.
I can give to you any other information in case.

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 16, 2020

I did another test, I can confirm that the same error happen in both the following cases:

  1. pulsar function output to the new topic is done using context.publish() function
class CopyFunction(BaseFunction):
    def customProcess(self, input, context):
        output = {'value': input['value'], 'timestamp': input['timestamp']}
        sensorValue = SensorSchema(value=output['value'], timestamp=output['timestamp'])
        context.publish("topic2", sensorValue, schema=AvroSchema(SensorSchema))
  1. returning the new value
class CopyFunction(BaseFunction):
    def customProcess(self, input, context):
        output = {'value': input['value'], 'timestamp': input['timestamp']}
        return output

@nicolo-paganin
Copy link

Pulsar log are showing this

21:51:43.791 [function-timer-thread-85-1] ERROR org.apache.pulsar.functions.runtime.process.ProcessRuntime - Extracted Process death exception
java.lang.RuntimeException:
	at org.apache.pulsar.functions.runtime.process.ProcessRuntime.tryExtractingDeathException(ProcessRuntime.java:383) ~[org.apache.pulsar-pulsar-functions-runtime-2.5.0.jar:2.5.0]
	at org.apache.pulsar.functions.runtime.process.ProcessRuntime.isAlive(ProcessRuntime.java:370) ~[org.apache.pulsar-pulsar-functions-runtime-2.5.0.jar:2.5.0]
	at org.apache.pulsar.functions.runtime.RuntimeSpawner.lambda$start$0(RuntimeSpawner.java:88) ~[org.apache.pulsar-pulsar-functions-runtime-2.5.0.jar:2.5.0]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_152]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_152]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_152]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_152]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_152]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_152]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.43.Final.jar:4.1.43.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]
21:51:43.791 [function-timer-thread-85-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/influxdb-test-sink-java.lang.RuntimeException:  Function Container is dead with exception.. restarting

@sijie
Copy link
Member

sijie commented Feb 16, 2020

@nicolo-paganin

I don't think you are producing the avro records to an Avro Topic.

  1. for the first function, you are using SerDe for serializing the python classes into bytes. So it is actually a message without schema.

  2. I don't think context.publish in python function support avro schema.

--

In order to verify if JDBC works as expected, I would suggest using the producer code (mentioned in previous comments) to produce AVRO records to verify if JDBC sink works as expected.

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 16, 2020

@sijie Thanks for the reply

  1. Yes I was using a custom SerDe class because of a problem with python 2.7 (here
    return str(input).encode('utf-8')
    , that .encode('utf-8') is not compatible with python 2.7 since in 2.7 strings are already byte strings). Now I am using Python 3.x so I just removed that cusotm SerDe class and tried again. Unfortunately I have the same error. A thing that I forgot to say is that when I create the function I set also this param --schema-type SensorSchema
pulsar-admin functions create \
  --py copyFunction.py \
  --classname copyFunction.copyFunction  \
  --schema-type SensorSchema \
  --tenant public \
  --namespace default \
  --name copyFunction1 \
  --inputs persistent://public/default/topic1 \
  --output persistent://public/default/topic2
  1. Yes I forgot to say that I did some modifcation in the python-instance you can find it here in my fork nicolo-paganin@ca45931. I am not sure if the modification is considering all cases and if it is good, anyway it was a try. The modification permits to write the AvroSchema using context.publish()

I was sure that the schema was present in the topic because I can read data with a consumer and calling this pulsar-admin schemas get topic2 is returning me the right schema:

{
  "version": 0,
  "schemaInfo": {
    "name": "topic2",
    "schema": {
      "type": "record",
      "name": "SensorSchema",
      "namespace": "it.oncode.argo.api.actors",
      "fields": [
        {
          "name": "value",
          "type": "double"
        },
        {
          "name": "timestamp",
          "type": "long"
        }
      ]
    },
    "type": "AVRO",
    "properties": {}
  }
}

@sijie do you mean the JAVA producer code? If yes, I can confirm the with JAVA producers I have no problem, the problem is using Python

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 17, 2020

I don't know if it can be useful, before the exception it is printing this GenericAvroSchema - No schema found for version(1), use latest schema ::

1:39:14.076 [pulsar-timer-4-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/channel2] [public/default/influxdb-test-sink] [0b9d4] Prefetched messages: 0 --- Consume throughput received: 50.00 msgs/s --- 0.01 Mbit/s --- Ack sent rate: 50.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
01:39:14.120 [public/default/influxdb-test-sink-0] WARN  org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema - No schema found for version(1), use latest schema : {
  "name": "public/default/channel1_output2",
  "schema": {
    "type": "record",
    "name": "SensorSchema",
    "namespace": "it.oncode.argo.api.actors",
    "fields": [
      {
        "name": "value",
        "type": "double"
      },
      {
        "name": "timestamp",
        "type": "long"
      }
    ]
  },
  "type": "AVRO",
  "properties": {}
}
01:39:14.136 [public/default/influxdb-test-sink-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [public/default/influxdb-test-sink:0] Uncaught exception in Java Instance
java.lang.NullPointerException: null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[java-instance.jar:?]
	at com.google.common.cache.LocalCache.get(LocalCache.java:3950) ~[java-instance.jar:?]
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-instance.jar:?]
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-instance.jar:?]
	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:98) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:96) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:39) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.api.Schema.decode(Schema.java:95) ~[java-instance.jar:?]
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:273) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.client.impl.TopicMessageImpl.getValue(TopicMessageImpl.java:143) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
	at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:2.5.0]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:472) ~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?]
	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246) [org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]

@tuteng
Copy link
Member

tuteng commented Feb 17, 2020

libpulsar seems unable to update to version 2.5.0 yet. #6091 @nicolo-paganin

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 17, 2020

@tuteng thanks I have upgraded lib pulsar to the 2.5.0 editing manually the homebrew formula with brew edit libpulsar like this

class Libpulsar < Formula
  desc "Apache Pulsar C++ library"
  homepage "https://pulsar.apache.org/"
  url "https://www.apache.org/dyn/closer.cgi?path=pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-src.tar.gz"
  revision 1

  bottle do
    cellar :any
    sha256 "13c37b77dee18f4bba454484b4426a6f3dad27f902e0793a56a6358897ab4f3f" => :catalina
    sha256 "ff0090b77842ac6034c4a425438f8d5b401164da4bada7ac11c4963dcdaa3a28" => :mojave
    sha256 "d73f612edf73a351ab7f90776c24670c4c77b85fd95e37999e863ff27daf89ef" => :high_sierra
  end

  depends_on "cmake" => :build
  depends_on "pkg-config" => :build
  depends_on "boost"
  depends_on "openssl@1.1"
  depends_on "protobuf"
  depends_on "snappy"
  depends_on "zstd"

  def install
    cd "pulsar-client-cpp" do
      system "cmake", ".", *std_cmake_args,
                      "-DBUILD_TESTS=OFF",
                      "-DBUILD_PYTHON_WRAPPER=OFF",
                      "-DBoost_INCLUDE_DIRS=#{Formula["boost"].include}",
                      "-DProtobuf_INCLUDE_DIR=#{Formula["protobuf"].include}",
                      "-DProtobuf_LIBRARIES=#{Formula["protobuf"].lib}/libprotobuf.dylib"
      system "make", "pulsarShared", "pulsarStatic"
      system "make", "install"
    end
  end

  test do
    (testpath/"test.cc").write <<~EOS
      #include <pulsar/Client.h>

      int main (int argc, char **argv) {
        pulsar::Client client("pulsar://localhost:6650");
        return 0;
      }
    EOS
    system ENV.cxx, "test.cc", "-L#{lib}", "-lpulsar", "-o", "test"
    system "./test"
  end
end

this is the log of the brew upgrade and brew info commands

 ~  brew upgrade libpulsar
==> Upgrading 1 outdated package:
libpulsar 2.4.2_1 -> 2.5.0_1
==> Upgrading libpulsar
==> Downloading https://homebrew.bintray.com/bottles/libpulsar-2.5.0_1.catalina.bottle.tar.gz

curl: (22) The requested URL returned error: 404 Not Found
Error: Failed to download resource "libpulsar"
Download failed: https://homebrew.bintray.com/bottles/libpulsar-2.5.0_1.catalina.bottle.tar.gz
Warning: Bottle installation failed: building from source.
==> Downloading https://www.apache.org/dyn/closer.cgi?path=pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-src.tar.gz
Already downloaded: /Users/nicolopaganin/Library/Caches/Homebrew/downloads/594e305adec94cc36760459e3848bfe7280abf71827148ad5ec65230c7716486--apache-pulsar-2.5.0-src.tar.gz
Warning: Cannot verify integrity of 594e305adec94cc36760459e3848bfe7280abf71827148ad5ec65230c7716486--apache-pulsar-2.5.0-src.tar.gz
A checksum was not provided for this resource.
For your reference the SHA-256 is: 87ffe8aa9456d1143e2436f83d3ba220f12246a90f03c6b51ed5dad0df489a3b
==> cmake . -DCMAKE_C_FLAGS_RELEASE=-DNDEBUG -DCMAKE_CXX_FLAGS_RELEASE=-DNDEBUG -DCMAKE_INSTALL_PREFIX=/usr/local/Cellar/libpulsar/2.5.0_1 -DCMAKE_BUILD_TYPE=Release -D
==> make pulsarShared pulsarStatic
==> make install
🍺  /usr/local/Cellar/libpulsar/2.5.0_1: 51 files, 14.7MB, built in 3 minutes 5 seconds
Removing: /usr/local/Cellar/libpulsar/2.4.2_1... (50 files, 14.2MB)
Removing: /Users/nicolopaganin/Library/Caches/Homebrew/libpulsar--2.4.2_1.catalina.bottle.tar.gz... (3.3MB)
==> Checking for dependents of upgraded formulae...
==> No dependents found!
 ~  brew info libpulsar
libpulsar: stable 2.5.0 (bottled)
Apache Pulsar C++ library
https://pulsar.apache.org/
/usr/local/Cellar/libpulsar/2.5.0_1 (51 files, 14.7MB) *
  Built from source on 2020-02-17 at 09:30:35
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/libpulsar.rb
==> Dependencies
Build: cmake ✔, pkg-config ✔
Required: boost ✔, openssl@1.1 ✔, protobuf ✔, snappy ✔, zstd ✔
==> Analytics
install: 142 (30 days), 427 (90 days), 1,427 (365 days)
install-on-request: 142 (30 days), 426 (90 days), 1,425 (365 days)
build-error: 0 (30 days)

Unfortunately this has not resolved the issue, I still have the same error. Any other advices @tuteng?
Thanks!

@nicolo-paganin
Copy link

I tried also using pulsar in docker container (using image apachepulsar/pulsar:2.5.0) and I have the same error.
@tuteng did you manage to reproduce the problem or is it something that is not happening in your environment?

@sijie
Copy link
Member

sijie commented Feb 17, 2020

@nicolo-paganin

  1. after upgraded to 2.5.0, have you deleted the topics to clean up the old messages before running the tests again?

  2. I see you are using regex subscription. Can you try to run the sink with topic1 only first? and then try to run the sink with topic2 later? We need to isolate the problem to see if it is from topic1, topic2 or from the regex subscription.

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 17, 2020

@sijie

  1. Yes I am always starting form a fresh pulsar installation (removing data folder). After upgrading libpulsar to 2.5.0 I have restarted to be sure :-)
  2. I tried also removing the regex and using single topic as input

if I put this using the topic created by the JAVA producer the sink is working

./pulsar-admin sinks create --tenant public --namespace default --name influxdb-test-sink --sink-type oncode-influxdb-connector  --sink-config-file influxdb.yaml  --inputs "public/default/topic_java"

If I use this with the topic created by the python pulsar function copying the values from topic_java to topic_python I have the same java.lang.NullPointerException

./pulsar-admin sinks create --tenant public --namespace default --name influxdb-test-sink --sink-type oncode-influxdb-connector  --sink-config-file influxdb.yaml  --inputs "public/default/topic_python"

@sijie
Copy link
Member

sijie commented Feb 17, 2020

If I use this with the topic created by the python pulsar function copying the values from topic_java to topic_python I have the same java.lang.NullPointerException

Since the python copy function is customized with some modifications, can you use a python producer to produce avro records into a topic and use that topic for running the sink connector? It would be good to isolate if it is a problem from python client or from python functions you are running?

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 17, 2020

@sijie I did the tests, following the results:

Using python producer
I am using this producer

import pulsar
import time
import datetime
import calendar
import time
from pulsar.schema import *

class SensorSchema(Record):
    value = Double(required=True)
    timestamp = Long(required=True)
    
client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('topic_python_producer', schema=AvroSchema(SensorSchema))

i = 0
while(True):
    i += 1
    message = SensorSchema(value=i, timestamp=calendar.timegm(time.gmtime())*1000)
    print(message)
    producer.send(message)
    time.sleep(1)

client.close()

The sink is working well, no error and I can see the data in the database

Using pulsar function
I am using this pulsar function, I removed all my customizations so I am using the released ones and I still have the error. So the problem is related only to pulsar functions

from pulsar import Function, SerDe
from pulsar.schema import *
import fastavro
import io
import json
import ast

class SensorSchema(Record):
    value = Double(required=True)
    timestamp = Long(required=True)


class CopyFunction(Function):
    def __init__(self):
        self.parsed_schema = SensorSchema.schema()

    def process(self, input, context):
        buffer = io.BytesIO(input)
        d = fastavro.schemaless_reader(buffer, self.parsed_schema)
        output = {'value': d['value'], 'timestamp': d['timestamp']}

        outbuffer = io.BytesIO()
        fastavro.schemaless_writer(outbuffer, self.parsed_schema, output)
        return outbuffer.getvalue()

I created my function in this way, so I am passing the --schema-type param.

./pulsar-admin functions create \
  --py copyFunction.py \
  --classname copyFunction.CopyFunction \
  --schema-type SensorSchema  \
  --tenant public \
  --namespace default \
  --name test_function1 \
  --inputs persistent://public/default/topic_java \
  --output persistent://public/default/topic_python

I don't know how that --schema-type param is working: I am passing the schema name of another topic. In the logs it seems that it is finding it and seems to be associated to the topic since if I run this command pulsar-admin schemas get topic_python this is the output. Moreover if run any consumer in that topic I can read the values without problems

{
  "version": 0,
  "schemaInfo": {
    "name": "topic_python",
    "schema": {
      "type": "record",
      "name": "SensorSchema",
      "namespace": "it.oncode.argo.api.actors",
      "fields": [
        {
          "name": "value",
          "type": "double"
        },
        {
          "name": "timestamp",
          "type": "long"
        }
      ]
    },
    "type": "AVRO",
    "properties": {}
  }
}

@nicolo-paganin
Copy link

nicolo-paganin commented Feb 17, 2020

@sijie @tuteng following the results of some test, I found out a configuration that is working

These are the tests I did modifying params in the producer creations:

  1. If I enable the batching and the schema is set (the value that I am sending is an instance of the class SensorSchema): does not work
  2. If I remove the schema setting and I disable the batching (the value that I am sending is a bytestring encoded with fastavro): does not work
  3. if I set the schema and disable the batching (the value that I am sending is an instance of the class SensorSchema): it works.

This is the function that I modified in this file apache-pulsar-2.5.0/instances/python-instance/python_instance.py

def setup_producer(self, schema):
        if self.instance_config.function_details.sink.topic != None and \
                len(self.instance_config.function_details.sink.topic) > 0:
            self.producer = self.pulsar_client.create_producer(
                str(self.instance_config.function_details.sink.topic),
                schema=schema,
                block_if_queue_full=True,
                # batching_enabled=True,
                # batching_max_publish_delay_ms=10,
                compression_type=pulsar.CompressionType.LZ4,
                # set send timeout to be infinity to prevent potential deadlock with consumer
                # that might happen when consumer is blocked due to unacked messages
                send_timeout_millis=0,
                properties=util.get_properties(util.getFullyQualifiedFunctionName(
                    self.instance_config.function_details.tenant,
                    self.instance_config.function_details.namespace,
                    self.instance_config.function_details.name),
                    self.instance_config.instance_id)
            )

@sijie
Copy link
Member

sijie commented Feb 18, 2020

@nicolo-paganin

  1. If I remove the schema setting and I disable the batching (the value that I am sending is a bytestring encoded with fastavro):

The second option will not work since. Since JDBC only works with topics with a schema.

Can you reproduce the issue (of option 1 and option 3) with a pure pulsar python client? It is better to see if the problem is in the python client-side or not.

sijie pushed a commit that referenced this issue Mar 6, 2020
### Motivation

Master Issue: #5454 

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.
kaynewu added a commit to kaynewu/pulsar that referenced this issue Mar 10, 2020
* [Issue 5904]Support `unload` all partitions of a partitioned topic (apache#6187)

Fixes apache#5904 

### Motivation
Pulsar supports unload a non-partitioned-topic or a partition of a partitioned topic. If there has a partitioned topic with too many partitions, users need to get all partition and unload them one by one. We need to support unload all partition of a partitioned topic.

* [Issue 4175] [pulsar-function-go] Create integration tests for Go Functions for production-readiness (apache#6104)

This PR is to provide integration tests that test execution of Go functions that are managed by the Java FunctionManager. This will allow us to test things like behavior during function timeouts, heartbeat failures, and other situations that can only be effectively tested in an integration test. 

Master issue: apache#4175
Fixes issue: apache#6204 

### Modifications

We must add Go to the integration testing logic. We must also build the Go dependencies into the test Dockerfile to ensure the Go binaries are available at runtime for the integration tests.

* [Issue 5999] Support create/update tenant with empty cluster (apache#6027)

### Motivation

Fixes apache#5999

### Modifications

Add the logic to handle the blank cluster name.

* Introduce maxMessagePublishBufferSizeInMB configuration to avoid broker OOM (apache#6178)

Motivation
Introduce maxMessagePublishBufferSizeInMB configuration to avoid broker OOM.

Modifications
If the processing message size exceeds this value, the broker will stop read data from the connection. When available size > half of the maxMessagePublishBufferSizeInMB, start auto-read data from the connection.

* Enable get precise backlog and backlog without delayed messages. (apache#6310)

Fixes apache#6045 apache#6281 

### Motivation

Enable get precise backlog and backlog without delayed messages.

### Verifying this change

Added new unit tests for the change.

* KeyValue schema support for pulsar sql (apache#6325)

Fixes apache#5560

### Motivation

Currently, Pulsar SQL can't read the keyValue schema data. This PR added support Pulsar SQL reading messages with a key-value schema.

### Modifications

Add KeyValue schema support for Pulsar SQL. Add prefix __key. for the key field name.

* Avoid get partition metadata while the topic name is a partition name. (apache#6339)

Motivation

To avoid get partition metadata while the topic name is a partition name.
Currently, if users want to skip all messages for a partitioned topic or unload a partitioned topic, the broker will call get topic metadata many times. For a topic with the partition name, it is not necessary to call get partitioned topic metadata again.

* explicit statement env 'BOOKIE_MEM' and 'BOOKIE_GC' for values-mini.yaml (apache#6340)

Fixes apache#6338

### Motivation
This commit started while I was using helm in my local minikube, noticed that there's a mismatch between `values-mini.yaml` and `values.yaml` files. At first I thought it was a copy/paste error. So I created apache#6338;

Then I looked into the details how these env-vars[ were used](https://github.com/apache/pulsar/blob/28875d5abc4cd13a3e9cc4f59524d2566d9f9f05/conf/bkenv.sh#L36), found out its ok to use `PULSAR_MEM` as an alternative. But it introduce problems:
1. Since `BOOKIE_GC` was not defined , the default [BOOKIE_EXTRA_OPTS](https://github.com/apache/pulsar/blob/28875d5abc4cd13a3e9cc4f59524d2566d9f9f05/conf/bkenv.sh#L39)  will finally use default value of `BOOKIE_GC`, thus would cover same the JVM parameters defined prior in `PULSAR_MEM`.
2. May cause problems when bootstrap scripts changed in later dev, better to make it explicitly.

So I create this pr to solve above problems(hidden trouble).

### Modifications

As mentioned above, I've made such modifications below:
1. make `BOOKIE_MEM` and `BOOKIE_GC` explicit in `values-mini.yaml` file.  Keep up with the format in`values.yaml` file.
2. remove all  print-gc-logs related args. Considering the resource constraints of minikube environment. The removed part's content is `-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintHeapAtGC -verbosegc -XX:G1LogLevel=finest`
3. leave `PULSAR_PREFIX_dbStorage_rocksDB_blockCacheSize` empty as usual, as [conf/standalone.conf#L576](https://github.com/apache/pulsar/blob/df152109415f2b10dd83e8afe50d9db7ab7cbad5/conf/standalone.conf#L576) says it would to use 10% of the direct memory size by default.

* Fix java doc for key shared policy. (apache#6341)

The key shared policy does not support setting the maximum key hash range, so fix the java doc.

* client: make SubscriptionMode a member of ConsumerConfigurationData (apache#6337)

Currently, SubscriptionMode is a parameter to create ConsumerImpl, but it is not exported out, and user could not set this value for consumer.  This change tries to make SubscriptionMode a member of ConsumerConfigurationData, so user could set this parameter when create consumer.

* Windows CMake corrections (apache#6336)

* Corrected method of specifying Windows path to LLVM tools

* Fixing windows build

* Corrected the dll install path

* Fixing pulsarShared paths

* remove future.join() from PulsarSinkEffectivelyOnceProcessor (apache#6361)

* use checkout@v2 to avoid fatal: reference is not a tree (apache#6386)

"fatal: reference is not a tree" is a known issue in actions/checkout#23 and fixed in checkout@v2, update checkout used in GitHub actions.

* [Pulsar-Client] Stop shade snappy-java in pulsar-client-shaded (apache#6375)

Fixes apache#6260 

Snappy, like other compressions (LZ4, ZSTD), depends on native libraries to do the real encode/decode stuff. When we shade them in a fat jar, only the java implementations of snappy class are shaded, however, left the JNI incompatible with the underlying c++ code.

We should just remove the shade for snappy, and let maven import its lib as a dependency.

I've tested the shaded jar locally generated by this pr, it works for all compression codecs.

* Fix CI not triggered (apache#6397)

In apache#6386 , checkout@v2 is brought in for checkout.

However, it's checking out PR merge commit by default, therefore breaks diff-only action which looking for commits that a PR is based on. And make all tests skipped.

This PR fixes this issue. and has been proven to work with apache#6396 Brokers/unit-tests.

* [Issue 6355][HELM] autorecovery - could not find or load main class (apache#6373)

This applies the recommended fix from
apache#6355 (comment)

Fixes apache#6355

### Motivation

This PR corrects the configmap data which was causing the autorecovery pod to crashloop
with `could not find or load main class`

### Modifications

Updated the configmap var data per [this comment](apache#6355 (comment)) from @sijie

* Creating a topic does not wait for creating cursor of replicators (apache#6364)

### Motivation

Creating a topic does not wait for creating cursor of replicators

## Verifying this change

The exists unit test can cover this change

* [Reader] Should set either start message id or start message from roll back duration. (apache#6392)

Currently, when constructing a reader, users can set both start message id and start time. 

This is strange and the behavior should be forbidden.

* Seek to the first one >= timestamp (apache#6393)

The current logic for `resetCursor` by timestamp is odd. The first message it returns is the last message earlier or equal to the designated timestamp. This "earlier" message should be avoided to emit.

* [Minor] Fix java code errors reported by lgtm.  (apache#6398)

Four kinds of errors are fixed in this PR:

- Array index out of bounds
- Inconsistent equals and hashCode
- Missing format argument
- Reference equality test of boxed types

According to https://lgtm.com/projects/g/apache/pulsar/alerts/?mode=tree&severity=error&id=&lang=java

* [Java Reader Client] Start reader inside batch result in read first message in batch. (apache#6345)

Fixes apache#6344 
Fixes apache#6350

The bug was brought in apache#5622 by changing the skip logic wrongly.

* Fix broker to specify a list of bookie groups. (apache#6349)

### Motivation

Fixes apache#6343

### Modifications

Add a method to cast object value to `String`.

* Fixed enum package not found (apache#6401)

Fixes apache#6400

### Motivation
This problem is blocking the current test. 1.1.8 version of `enum34` seems to have some problems, and the problem reproduces:

Use pulsar latest code:
```
cd pulsar
mvn clean install -DskipTests
dokcer pull apachepulsar/pulsar-build:ubuntu-16.04
docker run -it -v $PWD:/pulsar --name pulsar apachepulsar/pulsar-build:ubuntu-16.04 /bin/bash
docker exec -it pulsar /bin/bash
cmake .
make -j4 && make install 
cd python
python setup.py bdist_wheel
pip install dist/pulsar_client-*-linux_x86_64.whl
```
`pip show enum34`
```
Name: enum34
Version: 1.1.8
Summary: Python 3.4 Enum backported to 3.3, 3.2, 3.1, 2.7, 2.6, 2.5, and 2.4
Home-page: https://bitbucket.org/stoneleaf/enum34
Author: Ethan Furman
Author-email: ethan@stoneleaf.us
License: BSD License
Location: /usr/local/lib/python2.7/dist-packages
Requires:
Required-by: pulsar-client, grpcio
```

```
root@55e06c5c770f:/pulsar/pulsar-client-cpp/python# python
Python 2.7.12 (default, Oct  8 2019, 14:14:10)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from enum import Enum, EnumMeta
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ImportError: No module named enum
>>> exit()
```

There is no problem with using 1.1.9 in the test.

### Modifications

* Upgrade enum34 from 1.1.8 to 1.1.9

### Verifying this change

local test pass

* removed comma from yaml config (apache#6402)

* Fix broker client tls settings error (apache#6128)

when broker create the inside client, it sets tlsTrustCertsFilePath as "getTlsCertificateFilePath()", but it should be "getBrokerClientTrustCertsFilePath()"

* [Issue 3762][Schema] Fix the problem with parsing of an Avro schema related to shading in pulsar-client. (apache#6406)

Motivation
Avro schemas are quite important for proper data flow and it is a pity that the apache#3762 issue stayed untouched for so long. There were some workarounds on how to make Pulsar use an original avro schema, but in the end, it is pretty hard to run an enterprise solution on workarounds. With this PR I would like to find a solution to the problem caused by shading avro in pulsar-client. As it was discussed in the issue, there are two possible solutions for this problem:

Unshade the avro library in the pulsar-client library. (IMHO it seems like a proper solution for this problem, but it also brings a risk of unknown side-effects)
Use reflection to get original schemas from generated classes. (I went for this solution)
Could you please comment if this is a proper solution for the problem? I will add tests when my approach will be confirmed.

Modifications
First, we try to extract an original avro schema from the "$SCHEMA" field using reflection. If it doesn't work, the process falls back generation of the schema from POJO.

* Remove duplicated lombok annotations in the tests module (apache#6412)

* Add verification for SchemaDefinitionBuilderImpl.java (apache#6405)

### Motivation

Add verification for SchemaDefinitionBuilderImpl.java

### Verifying this change

Added a new unit test.

* Cleanup pom files in the tests module (apache#6421)

### Modifications

- Removed dependencies on test libraries that were already imported in the parent pom file.

- Removed groupId tags that are inherited from the parent pom file.

* Update BatchReceivePolicy.java (apache#6423)

BatchReceivePolicy implements Serializable.

* Consumer received duplicated deplayed messages upon restart

Fix when send a delayed message ,there is a case when a consumer restarts and pull duplicate messages. apache#6403

* Bump netty version to 4.1.45.Final (apache#6424)

netty 4.1.43 has a bug preventing it from using Linux native Epoll transport

This results in pulsar brokers failing over to NioEventLoopGroup even when running on Linux.

The bug is fixed in netty releases 4.1.45.Final

* Fix publish buffer limit does not take effect

Motivation
If set up maxMessagePublishBufferSizeInMB > Integer.MAX_VALUE / 1024 / 1024, the publish buffer limit does not take effect. The reason is maxMessagePublishBufferBytes always 0 when use following calculation method :

pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024 * 1024;
So, changed to

pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L;

* doc: Add on the missing right parenthesis (apache#6426)

* Add on the missing right parenthesis

doc: Missing right parenthesis in the `token()` line from Pulsar Client Java Code.

* Add on the missing right parenthesis on line L70

* Switch from deprecated MAINTAINER tag to LABEL with maintainer's info in Dockerfile (apache#6429)

Motivation & Modification
The MAINTAINER instruction is deprecated in favor of the LABEL instruction with the maintainer's info in docker files.

* Amend the default value of . (apache#6374)

* fix the bug of authenticationData is't initialized. (apache#6440)

Motivation
fix the bug of authenticationData is't initialized.

the method org.apache.pulsar.proxy.server.ProxyConnection#handleConnect can't init the value of authenticationData.
cause of the bug that you will get the null value form the method org.apache.pulsar.broker.authorization.AuthorizationProvider#canConsumeAsync
when implements org.apache.pulsar.broker.authorization.AuthorizationProvider interface.

Modifications
init the value of authenticationData from the method org.apache.pulsar.proxy.server.ProxyConnection#handleConnect.

Verifying this change
implements org.apache.pulsar.broker.authorization.AuthorizationProvider interface, and get the value of authenticationData.

* Remove duplicated test libraries in POM dependencies (apache#6430)

### Motivation
The removed test libraries were already defined in the parent pom

### Modification
Removed duplicated test libraries in POM dependencies

* Add a message on how to make log refresh immediately when starting a component (apache#6078)

### Motivation

Some users may confuse by pulsar/bookie log without flushing immediately.

### Modifications

Add a message in `bin/pulsar-daemon` when starting a component.

* Close ZK before canceling future with exception (apache#6228) (apache#6399)

Fixes apache#6228

* [Flink-Connector]Get PulsarClient from cache should always return an open instance (apache#6436)

* Update sidebars.json (apache#6434)

The referenced markdown files do not exist and so the "Next" and "Previous" buttons on the bottom of pages surrounding them result in 404 Not Found errors

* [Broker] Create namespace failed when TLS is enabled in PulsarStandalone (apache#6457)

When starting Pulsar in standalone mode with TLS enabled, it will fail to create two namespaces during start. 

This is because it's using the unencrypted URL/port while constructing the PulsarAdmin client.

* Update version-2.5.0-sidebars.json (apache#6455)

The referenced markdown files do not exist and so the "Next" and "Previous" buttons on the bottom of pages surrounding them result in 404 Not Found errors

* [Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ (apache#6391)

### Motivation
Fix apache#6168 .
>On C++ lib, like the following log, unacked messages are redelivered after about 2 * unAckedMessagesTimeout.

### Modifications
As same apache#3118, by using TimePartition, fixed ` UnackedMessageTracker` .
- Add `TickDurationInMs`
- Add `redeliverUnacknowledgedMessages` which require `MessageIds` to `ConsumerImpl`, `MultiTopicsConsumerImpl` and `PartitionedConsumerImpl`.

* [ClientAPI]Fix hasMessageAvailable() (apache#6362)

Fixes apache#6333 

Previously, `hasMoreMessages` is test against:
```
return lastMessageIdInBroker.compareTo(lastDequeuedMessage) == 0
                && incomingMessages.size() > 0;
```
However, the `incomingMessages` could be 0 when the consumer/reader has just started and hasn't received any messages yet. 

In this PR, the last entry is retrieved and decoded to get message metadata. for the batchIndex field population.

* Use System.nanoTime() instead of System.currentTimeMillis() (apache#6454)

Fixes apache#6453 

### Motivation
`ConsumerBase` and `ProducerImpl` use `System.currentTimeMillis()` to measure the elapsed time in the 'operations' inner classes (`ConsumerBase$OpBatchReceive` and `ProducerImpl$OpSendMsg`).

An instance variable `createdAt` is initialized with `System.currentTimeMills()`, but it is not used for reading wall clock time, the variable is only used for computing elapsed time (e.g. timeout for a batch).

When the variable is used to compute elapsed time, it would more sense to use `System.nanoTime()`.

### Modifications

The instance variable `createdAt` in `ConsumerBase$OpBatchReceive` and  `ProducerImpl$OpSendMsg` is initialized with `System.nanoTime()`. Usage of the variable is updated to reflect that the variable holds nano time; computations of elapsed time takes the difference between the current system nano time and the `createdAt` variable.

The `createdAt` field is package protected, and is currently only used in the declaring class and outer class, limiting the chances for unwanted side effects.

* Fixed the max backoff configuration for lookups (apache#6444)

* Fixed the max backoff configuration for lookups

* Fixed test expectation

* More test fixes

* upgrade scala-maven-plugin to 4.1.0 (apache#6469)

### Motivation
The Pulsar examples include some third-party libraries with security vulnerabilities.
- log4j-core-2.8.1
https://www.cvedetails.com/cve/CVE-2017-5645

### Modifications

- Upgraded the version of scala-maven-plugin from 4.0.1 to 4.1.0. log4j-core-2.8.1 were installed because scala-maven-plugin depends on it.

* [pulsar-proxy] fix logging for published messages (apache#6474)

### Motivation
Proxy-logging fetches incorrect producerId for `Send` command because of that logging always gets producerId as 0 and it fetches invalid topic name for the logging.

### Modification
Fixed topic logging by fetching correct producerId for `Send` command.

* [Issue 6394] Add configuration to disable auto creation of subscriptions (apache#6456)

### Motivation

Fixes apache#6394

### Modifications

- provide a flag `allowAutoSubscriptionCreation` in `ServiceConfiguration`, defaults to `true`
- when `allowAutoSubscriptionCreation` is disabled and the specified subscription (`Durable`) on the topic does not exist when trying to subscribe via a consumer, the server should reject the request directly by `handleSubscribe` in `ServerCnx`
- create the subscription on the coordination topic if it does not exist when init `WorkerService`

* Make tests more stable by using JSONAssert equals (apache#6435)

Similar to the change you already merged for AvroSchemaTest.java(apache#6247):
`jsonSchema.getSchemaInfo().getSchema()` in `pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java` returns a JSON object. `schemaJson` compares with hard-coded JSON String. However, the order of entries in `schemaJson` is not guaranteed. Similarly, test `testKeyValueSchemaInfoToString` in `pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java` returns a JSON object. `havePrimitiveType` compares with hard-coded JSON String, and the order of entries in `havePrimitiveType` is not guaranteed.


This PR proposes to use JSONAssert and modify the corresponding JSON test assertions so that the test is more stable.

### Motivation

Using JSONAssert and modifying the corresponding JSON test assertions so that the test is more stable.

### Modifications

Adding `assertJSONEqual` method and replacing `assertEquals` with it in tests `testAllowNullSchema`, `testNotAllowNullSchema` and `testKeyValueSchemaInfoToString`.

* Avoid calling ConsumerImpl::redeliverMessages() when message list is empty (apache#6480)

* [pulsar-client] fix deadlock on send failure (apache#6488)

* Enhance Authorization by adding TenantAdmin interface (apache#6487)

* Enhance Authorization by adding TenantAdmin interface

* Remove debugging comment

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>

* Independent schema is set for each consumer generated by topic (apache#6356)

### Motivation

Master Issue: apache#5454 

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.

* Fix memory leak when running topic compaction. (apache#6485)


Fixes apache#6482

### Motivation
Prevent topic compaction from leaking direct memory

### Modifications

Several leaks were discovered using Netty leak detection and code review.
* `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of `LedgerEntry`, but did not release the underlying buffers. Fix: iterate though the `Enumeration` and release underlying buffer. Instead of logging the case where the `Enumeration` did not contain any elements, complete the future exceptionally with the message (will be logged by Caffeine).
* Two main sources of leak in `TwoPhaseCompactor`. The `RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf` (uncompressedPayload). Also, the return ByteBuf of `RawBacthConverter.rebatchMessage` was not closed. The first one was easy to fix (release buffer), to fix the second one and make the code easier to read, I decided to not let `RawBacthConverter.rebatchMessage`  close the message read from the topic, instead the message read from the topic can be closed in a try/finally clause surrounding most of the method body handing a message from a topic (in phase two loop). Then if a new message was produced by `RawBacthConverter.rebatchMessage` we check that after we have added the message to the compact ledger and release the message.

### Verifying this change
Modified `RawReaderTest.testBatchingRebatch` to show new contract.

One can run the test described to reproduce the issue, to verify no leak is detected.

* Fix create partitioned topic with a substring of an existing topic name. (apache#6478)

Fixes apache#6468

Fix create a partitioned topic with a substring of an existing topic name. And make create partitioned topic async.

* Bump jcloud version to 2.2.0 and remove jcloud-shade module (apache#6494)

In jclouds 2.2.0, the [gson is shaded internally](https://issues.apache.org/jira/browse/JCLOUDS-1166). We could safely remove the jcloud-shade module as a cleanup.

* Refactor tests in pulsar client tools test (apache#6472)

### Modifications

The main modification was the reduction of repeated initialization of the variables in the tests.

* Fix Topic metrics documentation (apache#6495)

### Motivation

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

Motivation is to have correct reference-metrics documentation.

### Modifications

*Describe the modifications you've done.*

There is an error in the `Topic metrics` section

`pulsar_producers_count` => `pulsar_in_messages_total`

* [pulsar-client] remove duplicate cnx method (apache#6490)

### Motivation
Remove duplicate `cnx()` method for `producer`

* [proxy] Fix proxy routing to functions worker (apache#6486)

### Motivation


Currently, the proxy only works to proxy v1/v2 functions routes to the
function worker.

### Modifications

This changes this code to proxy all routes for the function worker when
those routes match. At the moment this is still a static list of
prefixes, but in the future it may be possible to have this list of
prefixes be dynamically fetched from the REST routes.

### Verifying this change
- added some tests to ensure the routing works as expected

* Fix some async method problems at PersistentTopicsBase. (apache#6483)

* Instead of always using admin access for topic, use read/write/admin access for topic (apache#6504)

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>

* [Minor]Remove unused property from pom (apache#6500)

This PR is a follow-up of apache#6494

* [pulsar-common] Remove duplicate RestException references (apache#6475)

### Motivation
Right now, various pulsar-modules have duplicate `RestException` class  and repo has multiple duplicate class. So, move `RestException` to common place and all modules should use the same Exception class to avoid duplicate classes.

* pulsar-proxy: fix correct name for proxy thread executor name (apache#6460)

### Motivation
fix correct name for proxy thread executor name

* Add subscribe initial position for consumer cli. (apache#6442)

### Motivation

In some case, users expect to consume messages from beginning similar to the option `--from-beginning` of kafka consumer CLI. 

### Modifications

Add `--subscription-position` for `pulsar-client` and `pulsar-perf`.

* [Cleanup] Log format does not match arguments (apache#6509)

* Start namespace service and schema registry service before start broker. (apache#6499)

### Motivation

If the broker service is started, the client can connect to the broker and send requests depends on the namespace service, so we should create the namespace service before starting the broker. Otherwise, NPE occurs.

![image](https://user-images.githubusercontent.com/12592133/76090515-a9961400-5ff6-11ea-9077-cb8e79fa27c0.png)

![image](https://user-images.githubusercontent.com/12592133/76099838-b15db480-6006-11ea-8f39-31d820563c88.png)


### Modifications

Move the namespace service creation and the schema registry service creation before start broker service.

* [pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTracker When Ack Messages . (apache#6498)

### Motivation
Because of apache#6391 , acked messages were counted as unacked messages. 
Although messages from brokers were acknowledged, the following log was output.

```
2020-03-06 19:44:51.790 INFO  ConsumerImpl:174 | [persistent://public/default/t1, sub1, 0] Created consumer on broker [127.0.0.1:58860 -> 127.0.0.1:6650]
my-message-0: Fri Mar  6 19:45:05 2020
my-message-1: Fri Mar  6 19:45:05 2020
my-message-2: Fri Mar  6 19:45:05 2020
2020-03-06 19:45:15.818 INFO  UnAckedMessageTrackerEnabled:53 | [persistent://public/default/t1, sub1, 0] : 3 Messages were not acked within 10000 time

```

This behavior happened on master branch.

* [pulsar-proxy] fixing data-type of logging-level (apache#6476)

### Modification
`ProxyConfig` has wrapper method for `proxyLogLevel` to present `Optional` data-type. after apache#3543 we can define config param as optional without creating wrapper methods.

* [pulsar-broker] recover zk-badversion while updating cursor metadata (apache#5604)

fix test

Co-authored-by: ltamber <ltamber12@gmail.com>
Co-authored-by: Devin Bost <devinbost@users.noreply.github.com>
Co-authored-by: Fangbin Sun <sunfangbin@gmail.com>
Co-authored-by: lipenghui <penghui@apache.org>
Co-authored-by: ran <gaoran_10@126.com>
Co-authored-by: liyuntao <liyuntao58607@gmail.com>
Co-authored-by: Jia Zhai <zhaijia@apache.org>
Co-authored-by: Nick Rivera <heronr@users.noreply.github.com>
Co-authored-by: Neng Lu <freeneng@gmail.com>
Co-authored-by: Yijie Shen <henry.yijieshen@gmail.com>
Co-authored-by: John Harris <jharris-@users.noreply.github.com>
Co-authored-by: guangning <guangning@apache.org>
Co-authored-by: newur <ruwen.reddig@gmail.com>
Co-authored-by: Sergii Zhevzhyk <vzhikserg@users.noreply.github.com>
Co-authored-by: liudezhi <33149602+liudezhi2098@users.noreply.github.com>
Co-authored-by: Dzmitry Kazimirchyk <dzmitryk@users.noreply.github.com>
Co-authored-by: futeng <ifuteng@gmail.com>
Co-authored-by: bilahepan <YTgaotianci@gmail.com>
Co-authored-by: Paweł Łoziński <pawel.lozinski@gmail.com>
Co-authored-by: Ryan Slominski <ryans@jlab.org>
Co-authored-by: k2la <mzq6mft9zz@gmail.com>
Co-authored-by: Rolf Arne Corneliussen <racorn@users.noreply.github.com>
Co-authored-by: Matteo Merli <mmerli@apache.org>
Co-authored-by: Sijie Guo <sijie@apache.org>
Co-authored-by: Rajan Dhabalia <rdhabalia@apache.org>
Co-authored-by: Sanjeev Kulkarni <sanjeevrk@gmail.com>
Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
Co-authored-by: congbo <39078850+congbobo184@users.noreply.github.com>
Co-authored-by: Ilya Mashchenko <ilya@netdata.cloud>
Co-authored-by: Addison Higham <addisonj@gmail.com>
jiazhai pushed a commit that referenced this issue Mar 13, 2020
### Motivation

Master Issue: #5454

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.
(cherry picked from commit 8003d08)
tuteng pushed a commit to AmateurEvents/pulsar that referenced this issue Mar 21, 2020
…e#6356)

### Motivation

Master Issue: apache#5454 

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.

(cherry picked from commit 8003d08)
tuteng pushed a commit that referenced this issue Apr 6, 2020
### Motivation

Master Issue: #5454 

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.

(cherry picked from commit 8003d08)
tuteng pushed a commit that referenced this issue Apr 13, 2020
### Motivation

Master Issue: #5454 

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.

(cherry picked from commit 8003d08)
jiazhai pushed a commit to jiazhai/pulsar that referenced this issue May 18, 2020
…e#6356)

### Motivation

Master Issue: apache#5454

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.
(cherry picked from commit 8003d08)
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
…5930)

Fixes apache#5454 


### Motivation

The current CPP client cannot correctly obtain the schema version, resulting in an error in parsing with java client when sending data with schema using python client.

Test code:

```
import pulsar
import json

from pulsar.schema import *

class Test(Record):
    name = String()
    id = Integer()

client = pulsar.Client('pulsar://localhost:6650');
producer = client.create_producer('test-producer-schema', schema=AvroSchema(Test))
producer.send(Test(name='Hello', id=1))
client.close()
```



### Modifications

* Add set schema version in msgmetadata

### Verifying this change

Add check schema version in unit test SchemaTest
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
…e#6356)

### Motivation

Master Issue: apache#5454 

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants