Skip to content

[Doc]--Add schema example for cpp and CGo client#7127

Closed
Huanli-Meng wants to merge 167 commits intoapache:masterfrom
Huanli-Meng:Add-schema-exampe-for-cpp-and-cgo-client
Closed

[Doc]--Add schema example for cpp and CGo client#7127
Huanli-Meng wants to merge 167 commits intoapache:masterfrom
Huanli-Meng:Add-schema-exampe-for-cpp-and-cgo-client

Conversation

@Huanli-Meng
Copy link
Contributor

(If this PR fixes a github issue, please add Fixes #<xyz>.)

Fixes #

(or if this PR is one task of a github issue, please add Master Issue: #<xyz> to link to the master issue.)

Master Issue: #

Motivation

Explain here the context, and why you're making that change. What is the problem you're trying to solve.

Modifications

Describe the modifications you've done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)
  • The rest endpoints: (yes / no)
  • The admin cli options: (yes / no)
  • Anything that affects deployment: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? ( no)
  • If yes, how is the feature documented? (docs)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Huanli-Meng and others added 30 commits May 19, 2020 13:44
Update section: Pulsar Functions > Reference: CLI:
subsections: localrun/create/update

updated versioned docs: V2.4.1 and later releases.
* Fixed website publish failed

* Fixed ko language build failed
### Motivation

* Add apache pulsar 2.5.1 features blog
…he#6795)

### Motivation

Fix some empty message related problems in the compacted topic.

1. Fix message delete of a key for the batch message.
2. Fix compaction for all empty messages in the topic. If all messages are empty, the compaction should delete all messages in the compacted topic. Without this fix, the compact task failure with NPE and the consumer can get all messages.
3. Seek to the compaction horizon when the last compaction task deletes all messages from the compacted topic(all previous messages are deleted by empty message). Without this fix, the consumer will stuck because of no entries in the ledger that the compacted topic used.

### Verifying this change

Add unit test for the changes
…etheus (apache#6705)

## Motivation
The managed ledger read cache monitor metric is export via /admin/broker-stats/metrics with json format, it is hard to parse, collect and display, what's more the read cache is a very import module for message consuming throughput and latency. So collect and display the read cache metrics is extremely urgent for pulsar in production.

## Changes
I parse the json format metric to prometheus message type and export to prometheus monitor port, so those metrics can be displayed in grafana.

Please help check those changes, if it's ok, i will update the metric document.
…apache#6783)

The messageReceiveTimeoutMs value in the PulsarConsumerSource class is hardcoded to 100ms and cannot be altered through configuration at present.
…ache#6812)

* Fix bug: sendCallback's 2nd argument was always the default MessageId

* Set batch index for each message's callback of batch

* Add test for message id in send callback

* Ensure all send callbacks completed before ASSERT_EQ
Motivation
After the search is available, set 2.5.1 to the top
Modifications
After the search is available, set 2.5.1 to the top
### Motivation

In Python client, the serialization of enums when using the schema is currently broken, throwing error because it's not possible to directly serialize them into json.

Instead, for both Avro and JSON, we need to handle the enum serialization on its own way.
* Fix encryption examples

* Minor grammar fix

Co-Authored-By: Yu Liu <50226895+Anonymitaet@users.noreply.github.com>

Co-authored-by: Yu Liu <50226895+Anonymitaet@users.noreply.github.com>
Fix apache#6620

### changes
1. update topic name document
2. update code annotation in TopicName.java to avoid topic name confusion
Bookkeeper has settings to allow for periodic calls to refresh the
bookie info. In dynamic environments (like when running on k8s) fetching
the bookie info more often can be helpful to ensure the brokers stay up
to date on the list of bookies even in the event of zookeeper watch
issues.

Co-authored-by: Addison Higham <ahigham@instructure.com>
apache#6794)

Fixes apache#6793

### Motivation
Now broker, can't set max number of partitions per partitioned topic. So we can't prevent to create partitioned topic by many partitions from server side.
In this PR, introduce limit of max partitions to broker and to be able to control it.

### Modifications
Add `maxNumPartitionsPerPartitionedTopic` config to broker and compare with numPartitions at create or update partitioned topic.
If the config is set to `0`(is default), then disable the check.
Fixes apache#6840

Motivation
Avoid creating partitioned topic for partition name

Verifying this change
New unit test added.
Master Issue: apache#6843
## Motivation

broker supports ManagedLedger configuration

## Modifications

This change can be supported managedLedgerPassword configuration

org.apache.pulsar.broker.ServiceConfiguration
```java
    @FieldContext(
            category = CATEGORY_STORAGE_ML,
            doc = "Default  password to use when writing to BookKeeper. \n\nDefault is ``."
        )
    private String managedLedgerPassword = "";
```
org.apache.pulsar.broker.service.BrokerService
```java
  managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
```

* support setPassword to managedLedger from brokerconfg

* support setPassword to managedLedger from brokerconfg

Co-authored-by: dezhiliu <dezhiliu@tencent.com>
…#6825)

Master Issue: apache#6824
## Motivation

When sending messages asynchronously fails, an exception will be thrown, but it is not known which message is abnormal, and the user does not know which messages need to be retried。

## Modifications

This change can be supported on the client side,   when throwing an exception need to set sequenceId
org.apache.pulsar.client.api.PulsarClientException


```java
public class PulsarClientException extends IOException {
    private long sequenceId = -1;

    public PulsarClientException(String msg, long sequenceId) {
        super(msg);
        this.sequenceId = sequenceId;
    }
```
Client examples
```java
  producer.newMessage().sequenceId(1).value(value.getBytes())
                .sendAsync().thenAccept(msgId -> {
                    System.out.println(msgId);
                }).exceptionally(ex -> {
                    System.out.println( ((PulsarClientException)ex.getCause()).getSequenceId());
                    return null;
                });
```
…ache#6034)

Fixes apache#5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
…pache#6851)

## Motivation

We can automatically delete inactive subscriptions by setting `subscriptionExpirationTimeMinutes` in broker.conf to a value greater than 0.
```sh
# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0
```

However, since this setting value applies to all topics, we have to set it to 0 if there is even one topic whose subscriptions should not be deleted.

### Modifications

Enable users to set a subscription expiration time for each namespace. This value overrides `subscriptionExpirationTimeMinutes` in broker.conf.
```sh
$ ./bin/pulsar-admin namespaces set-subscription-expiration-time --time 60 tenant1/ns1
$ ./bin/pulsar-admin namespaces get-subscription-expiration-time tenant1/ns1

60
```
…ache#6848)

### Motivation

JDBC sink does not handle `null` fields. For example, the field `example` can potentially be null. The schema registered in Pulsar allows for it, and the table schema in MySQL has a column of the same name, is configured as double and also allows nulls. When messages are sent to the JDBC sink without that field, an exception like this is seen:

```
21:08:38.472 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception 
java.sql.SQLException: Data truncated for column 'example' at row 1
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:127) ~[mysql-connector-java-8.0.11.jar:8.0.11]
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:95) ~[mysql-connector-java-8.0.11.jar:8.0.11]
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[mysql-connector-java-8.0.11.jar:8.0.11]
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:960) ~[mysql-connector-java-8.0.11.jar:8.0.11]
	at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:388) ~[mysql-connector-java-8.0.11.jar:8.0.11]
	at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:202) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
	at org.apache.pulsar.io.jdbc.JdbcAbstractSink.lambda$open$0(JdbcAbstractSink.java:108) ~[pulsar-io-jdbc-2.5.0.nar-unpacked/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_232]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_232]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232]
```
Looking at the code for the JDBC sink, there was no handling of the case where the field was `null`. The PR adds code to handle that case. It also adds unit tests to cover this for both binary and JSON encoding of the schema.

### Modifications

When the sink encounters a `null` field value it uses the `setColumnNull` method to properly reflect this in the database row.
…nctions_worker config (apache#6878)

Motivation
There are many other examples (which are commented out) in the configuration files, so it makes sense to include an example config for runtimeCustomizerClassName.

Modifications
Added example on how to use the runtimeCustomizerClassName parameter in functions_worker.yml.
Fixes apache#6872
Fixes apache#6416

If a producer tries to create a producer to a topic that is currently
unloading, we can get a `RuntimeException` from
`BrokerService.checkTopicNsOwnership` which is bubbled up through
`topic.addProducer`. By only handling a `BrokerServiceException` this
results in a future that never completes and results in producers not
being able to be created if this topic is scheduled back to this broker.
change log level to debug to avoid annoying.
Master Issue: apache#5847

### Motivation

Installation error of dashboard database postgres 11 resulted in startup failure.

### Modifications

* Update apachepulsar/pulsar-dashboard:2.5.1 image https://hub.docker.com/layers/apachepulsar/pulsar-dashboard/2.5.1/images/sha256-61b47a7302639aba1357d09ca69a842c4a67bff38b230753d6bd638df0461c6b?context=explore
* Update Docker file for fix postgresql version 11.

### Verifying this change

Local test pass
merlimat and others added 27 commits June 1, 2020 20:31
ClientImpl::handleClose was using static variables to record the first
error on closing a client. This is just wrong. A static stack variable
in c++ acts like a global. So if errorClosing was ever set to true in
a process, all clients closed in that process after that point would
be errored with first error.

This fixes a failure in BasicEndToEndTest.testDelayedMessages, which
happens sporadically on C++, and always when running the tests in
serial, probably due to a double close in some other test.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
…apache#7067)

This works around the issue where if the hostname of the machine has an underscore,
the test will fail to run

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
Some tests had @test at the class level and also at the method
level. This resulted in these tests running twice.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
…7065)

### Motivation

When a namespace bundle split operation fails by timeout, we're not cleaning up the current in-memory metadata of the old bundles. This lead to the broker to treat the operation as "non-succeeded" while in fact it might have gone through.

In this case, the broker keep thinking that it has to split a particular bundle (based on the old load report information) and that fails with ZK bad-version errors, and again we're not cleaning up the stale metadata.

### Modifications

Reverse the logic to first remove the bundle load data and then perform the split.

If the split were to fail, its load data will be re-added in any case with the next broker load report.

This will ensure that the leader load manager won't be keeping track of a non-existing bundle after a timeout in the split operation.
### Motivation

In order to have a predictable memory footprint in brokers, we need to cap the amount of bytes that we read in single range read request from BK. 

Since BK doesn't take a max bytes argument, we need instead to use a best-effort approach and adjust the max number of entries based on the average entry (not message) size on the topic.
…xpose avgMessagesPerEntry metric (apache#6719)

### Motivation
when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:
```
messagesToRead = Math.min(availablePermits, readBatchSize);
```
`availablePermits` is the number of message the consumer requested, and `readBatchSize` is the max read entry size set in broker.conf

Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` messages from bookkeeper or broker cache and push `500,000` messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase `readBatchSize` to increase bookkeeper read throughput.

### Changes
I add an variable `avgMessagesPerEntry` to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula
```
avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value
```
`avgMessagePerEntry` is the history average message number per entry and `new Value` is the message number per entry in the fetch request the broker read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:
```
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
```

I also expose the avgMessagePerEntry static value to consumer stat metric json.
The topics used in a bunch of the cases in BasicEndToEndTest had no
unique identifier, so if you ran it multiple times against the same
standalone cluster, they would fail, as the topics would already have
some state.

This change adds the unique identifer to the topics.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
### Motivation

In Python client, we're not validating the type of the values set on the records when using schema.

This means that, through a simple mistake, an application can end up publishing data that is not compatible with the declared schema.
### Motivation

Applying the new functionality introduced in apache#7060 to hint to the underlying cursor the max size that we are comfortable to read at this time.
* update asynchttpclient verison

* add default config for new version of asynchttp

* update license version

* fix ut

* add default config for asynchttp to avoid npe
…#7103)

* Moved ClassLoading and Reflections Helper functions to common

* Fix tests

* Fix test

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
Fixes apache#7032

### Motivation
Source data flow:
Custom Source-> Function-> Pulsar producer-> Broker

Sink data flow:
Broker-> Pulsar consumer-> Function-> Custom Sink

Pulsar consumer is only used in sink mode, so it is necessary to add DLQ parameters for sink mode
)

When we're increasing the number of partitions in a topic, we typically also pre-create the subscriptions on those topics. In this case, we're also pre-creating the non-durable subscriptions of the readers.

These are getting created as durable subscriptions on the new partitions and will be leaked and causing backlogs.

Instead, we don't need to pre-create anything for readers (non-durable subscriptions).

* Fixed increasing number of partitions with attached readers

* Fixed test expectation after merge
* [Doc] Update for acknowledgment at batch index level

* Update site2/docs/concepts-messaging.md
Negative acknowledgement runs in the background on a consumer and
triggers redelivery of messages. The tests verify a that messages do
indeed get redelivered, and which messages they are, for the base
case, batching and partitioned consumer.

There's a fundamental dependency on timing in the base case. If 100ms
pass between consumer creation and receiving the last message in first
receive loop, redelivery will be triggered and the order of messages,
as asserted by the test will fail.

This first case can be fixed by moving the negative ack to run after
all messages have been received. However, this can also then fail for
the batch case.

If the negative ack tracker kicks off during the loop to negatively
ack the messages, then the redelivery will happen twice (and possibly
more times depending on how many time it manages to run).

For this reason, if we want the test to be deterministic, we need to
disable the tracker from kicking off redelivery while we send mark the
messages as negatively acked.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
Master Issue: apache#7041

### Motivation

When a leader broker is restarted, some producers for topics owned by that broker may not be reopened on the new broker. When this happens, message publishing will continue to fail until the client application is restarted.

As a result of the investigation, I found that lookup requests sent by the producers in question are redirected more than 10,000 times between multiple brokers.

When a lookup request is redirected, `BinaryProtoLookupService#findBroker()` is called recursively. Therefore, tens of thousands of redirects will cause `StackOverflowError` and `BinaryProtoLookupService#findBroker()` will never complete.

### Modifications

Limit the number of times a lookup is redirected to 100. This maximum is user configurable. If the number of redirects exceeds 100, the lookup will fail. But `ConnectionHandler` retries lookup so that the producer can eventually reconnect to the new broker.
There seems to be a race when mocking the compactor in
MockedPulsarServiceBaseTest which yields errors like:
```
Caused by: java.lang.ClassCastException: org.apache.pulsar.compaction.TwoPhaseCompactor$MockitoMock$1141048386 cannot be cast to org.apache.pulsar.broker.service.BrokerService
```

This obviously causes tests to fail in strange and surprising
ways. I've not been able to reproduce, but the issue seems to be with
how we mock the compactor.

We don't want to have to construct the whole compactor as we just want
to spy on it, so we get the current compactor, wrap it in spy and tell
the PulsarService instance to return it when getCompactor is called.

However, we are doing this after we have already called
PulsarService#start(). So there are threads already using the mock
structures. The mock structures themselves are not thread safe, so
modifying them while they are in use, is not a good idea.

The fix is to finish mocking before invoking #start(). Do do this,
I've broken out the Compactor construction method in PulsarService, so
that alone can be mocked to wrap the object in a spy.

Co-authored-by: Ivan Kelly <ikelly@splunk.com>
…e#7085)

* add feature: enable maxSizePerLedger settings in configuration

* add feature: enable maxSizePerLedger settings in configuration

Co-authored-by: wuzhanpeng <wuzhanpeng@bigo.sg>
### Motivation

The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. 

There are few problems with the current approach:

 1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either. 

 2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.

This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:

```
Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
```
As you can see, `c1` will take most of the traffic. 

Most likely it will not be able to process all the messages and the backlog builds up. 


### Modifications

 * No functional difference from user perspective
 * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
 * Number of points in the ring is configurable, default=100. 
 * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
  * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on apache#6647 (comment). I'm happy to discuss more about it.
…e#7119)

Motivation
OpenAPI was missing info about body params for namespaces admin API.

Modifications
Added attributes for the body params in the API for namespaces.
@Huanli-Meng
Copy link
Contributor Author

This PR is for the issue 6282: #6282
Add schema contents to CPP and CGO client in the master repo. after the doc is reviewed and approved, will add it to the previous releases.

@Huanli-Meng
Copy link
Contributor Author

close the PR and re-propose a PR.

@Huanli-Meng Huanli-Meng closed this Jun 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.