Skip to content

Comments

[Issue 6040][pulsar-client-go] Avoid incompatible pointer types warnings during build#8105

Closed
bschofield wants to merge 49 commits intoapache:masterfrom
bschofield:bschofield/fix-issue-6040
Closed

[Issue 6040][pulsar-client-go] Avoid incompatible pointer types warnings during build#8105
bschofield wants to merge 49 commits intoapache:masterfrom
bschofield:bschofield/fix-issue-6040

Conversation

@bschofield
Copy link
Contributor

Fixes #6040.

Motivation

As per #6040 (comment), the golang pulsar client intentionally contains code that produces warning messages at build time.

This makes effective use of IDEs (e.g. VSCode) hard, because genuine linter errors are replaced with spam messages about uninitialized pointer errors in the pulsar client.

The golang pulsar library should be a good citizen, and not produce these build-time warnings.

Modifications

Suppressed the uninitialized pointer warnings.

Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API: no
  • The schema: no
  • The default values of configurations: no
  • The wire protocol: no
  • The rest endpoints: no
  • The admin cli options: no
  • Anything that affects deployment: no

Documentation

  • Does this pull request introduce a new feature? no

@bschofield
Copy link
Contributor Author

bschofield commented Sep 22, 2020

@wolfstudy would this be an acceptable change? If you prefer not to switch off warnings, I could add explicit casts.

@bschofield bschofield changed the title [Issue 6040][pulsar-client-go] Avoid incompatible pointer types warnings during build. [Issue 6040][pulsar-client-go] Avoid incompatible pointer types warnings during build Sep 22, 2020
@sijie sijie added area/client component/go type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages release/2.6.2 labels Oct 1, 2020
@sijie sijie added this to the 2.7.0 milestone Oct 1, 2020
@sijie
Copy link
Member

sijie commented Oct 1, 2020

/pulsarbot run-failure-checks

bschofield and others added 20 commits October 5, 2020 19:28
Fix #8196 

### Motivation
In BrokerService#close, it close interceptor first and close workerGroup in the second. However, in workerGroup.shutdownGracefully(), it will call ServerCnx#channelInactive method. The code with bug as follow.
```
getBrokerService().getInterceptor().onConnectionClosed(this);
```
It doesn't check whether `getBrokerService().getInterceptor()` is null and call `onConnectionClosed` may cause NullPointerException.

### Changes
1. In BrokerService#close, close workerGroup first and close interceptor in the second.
2. In ServerCnx#channelInactive, check `getBrokerService().getInterceptor()` whether is null before call onConnectionClosed method.
Fixes #5759

### Motivation
unused subscriptions will never be cleared on non-persistent topics

### Modifications
Add the `lastActive` attribute for non-persistent subscriptions. In the `checkInactiveSubscriptions` method of `NonPersistentTopic`, eligible subscriptions will be cleaned up

### Verifying this change
org.apache.pulsar.client.api.NonDurableSubscriptionTest#testDeleteInactiveNonPersistentSubscription
### Motivation & Changes
1. add null check for topic policies to avoid NPE
2. add boundary check for `MaxUnackedMessagesOnSubscription` and `SetMaxUnackedMessagesOnConsumer`
…#8191)

Fix #8124 

### Changes
Related to `PR-5571`, I add epoch on `HandleBase` for C++ Client.
Due to C++ client not expose handler interface to producer client, i can't add epoch test in the test case. I doubt whether to expose those interfaces to get handler instance and epoch just for test case.

Please take a look and give me some ideas. @sijie @jiazhai @codelipenghui
Fixes #8187

### Motivation

The length of the consumerName in pulsar-dashboard's django model was increased to 256 in #4716 however the change was not propagated by the django migrations system. The existing migrations result in a size of 64:
https://github.com/apache/pulsar/blob/e65875b99e3985630e3147c2579e91da4c3973bf/dashboard/django/stats/migrations/0001_initial.py#L56

This pull request fixes this. `./manage.py makemigrations` was used to generate this migration.

### Modifications

An additional django migration file is included to update the db to match the model.
### Motivation

When installing or running Pulsar CGo client, we get the following warning:
```
../go/pkg/mod/github.com/apache/pulsar/pulsar-client-go@v0.0.0-20201001145619-e65875b99e39/pulsar/c_go_pulsar.h: In function '_pulsar_producer_send_async':
../go/pkg/mod/github.com/apache/pulsar/pulsar-client-go@v0.0.0-20201001145619-e65875b99e39/pulsar/c_go_pulsar.h:70:5: warning: passing argument 3 of 'pulsar_producer_send_async' from incompatible pointer type [-Wincompatible-pointer-types]
     pulsar_producer_send_async(producer, message, pulsarProducerSendCallbackProxy, ctx);
     ^
```
However, we can produce messages successfully.

### Modifications

The second argument of the callback function `pulsar_send_callback` must be `pulsar_message_id_t`, not `pulsar_message_t`.
https://github.com/apache/pulsar/blob/be6a102511560349701dbe6b83fabf831dc81340/pulsar-client-cpp/include/pulsar/c/producer.h#L80-L81
https://github.com/apache/pulsar/blob/be6a102511560349701dbe6b83fabf831dc81340/pulsar-client-cpp/include/pulsar/c/producer.h#L34
Fix #7657 

### Motivation
In `GenericJsonRecord.java`, it deserialize byte to String.
```
public Object getField(String fieldName) {
        JsonNode fn = jn.get(fieldName);
        if (fn.isContainerNode()) {
            AtomicInteger idx = new AtomicInteger(0);
            List<Field> fields = Lists.newArrayList(fn.fieldNames())
                .stream()
                .map(f -> new Field(f, idx.getAndIncrement()))
                .collect(Collectors.toList());
            return new GenericJsonRecord(schemaVersion, fields, fn);
        } else if (fn.isBoolean()) {
            return fn.asBoolean();
        } else if (fn.isFloatingPointNumber()) {
            return fn.asDouble();
        } else if (fn.isBigInteger()) {
            if (fn.canConvertToLong()) {
                return fn.asLong();
            } else {
                return fn.asText();
            }
        } else if (fn.isNumber()) {
            return fn.numberValue();
        } else {
            return fn.asText();
        }
    }
```
### Changes
Add check the jsonNode binary type and convert to binaryValue instead of `asText`.
…to instances of Golang pulsar functions (#8132)

### Motivation

Currently, pulsar functions written in Golang have no access to the content of the user-config parameter from admin CLI.
That's the simplest way to pass custom configuration into a go function right now because neither secretsMap nor Kubernetes runtime is available for Golang functions yet.

**Note**: For the `secretsMap` parameter, the same technique can be used, but this has to be addressed in a separate PR.

### Modifications

- Added `String GoInstanceConfig#userConfig` attribute
- Propagated user-config parameter in `RuntimeUtils.getGoInstanceCmd`
- Exposed userConfig in `pulsar-function-go` and made it available in the `FunctionContext`
- Add unit tests for the affected code
Fix #8171 

### Changes
expose ensemblePlacementPolicy in bookkeeper.conf, makes it can be configured sync with broker ensemble placement policy and applied to bookkeeper auto recovery.
* Upgrade Jcommander to 1.80

* Fix build

* 1.78 is the stable version

* Fix test

Co-authored-by: Sanjeev Kulkarni <sanjeevk@splunk.com>
### Motivation
In production env, we will monitor Pulsar broker log level and alert when encountered error log. However, when getBacklogQuota failed from topic policy or namespace policy, it will report error level logs and get default backlog quota. We should change the exception log level from error to warn to avoid unnecessary alert.

### Changes
1. change getBacklogQuota exception log level from error to warn.
**Modification**

Enable intercept filters only when interceptors are configured
### Motivation
Offload has been updated asynchronously in `checkPersistencePolicies()` on line 1888, and there is no need to update repeatedly.
Fix #7673 

### Changes
For each python defined field type, init default value.
### Motivation
There are many unnecessary locks in MultiTopicsConsumerImpl, which affect performance.
BlockingQueue is inherently thread-safe, and there is no need to lock in many places.

### Modifications
Remove unnecessary locks
 
### Verifying this change
Use the perf tool, 3 * 8-core 16G nodes,recording time is about 2 minutes

1. Prepare a 3-node pulsar cluster and produce some data(topic with 4 partitions)
2. Use pulsar-perf on another machine,
3. `bin/pulsar-perf consume -u 'http://x.x.x.x:8080' -s my-sub-6 -sp Earliest -q 100000 persistent://public/default/p-topic`
Pressure test twice, the first time with the original one, and the second time to replace the pulsar-client-original.jar in the lib folder

before removing:
Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s

after removing:
Aggregated throughput stats --- 25062077 records received --- 161656.814 msg/s --- 1262.944 Mbit/s
Fixes #6408

### Motivation
1)ExecutionException cannot be cast to org.apache.pulsar.client.admin.PulsarAdminException
2)Client cannot recognize multiple addresses

### Modifications
support multiple addresses
### Motivation
When using pulsar-client CLI, we have a lot of log4j logs useless for just sending/receiving messages.
It could great to not display these logs with just a env variable (like in pulsar-admin).

### Modifications
The log level can be configured with the system property `pulsar.log.level` for the Java application.
The modification is the ability to set this level using a env variable PULSAR_LOG_LEVEL (default: info).
codelipenghui and others added 19 commits October 16, 2020 20:49
Motivation
To troubleshooting the consumer stop to receive messages under the key_shared subscription, it's difficult to know if the broker stops to dispatch messages to this consumer to preserve order. So this PR exposes a metrics to show the read position when the consumer joining so that we can compare the read position and the mark delete position to determine.

Verifying this change
Unit test added
Fixes #8223

### Motivation
Support limits the max tenants of the Pulsar cluster. When the tenants reach the max tenants of the cluster, the broker should reject the new tenant request.

### Modifications
Add maxTenants=0 in the broker.conf and don't limit by default.

### Verifying this change
TenantTest#testMaxTenant
Fixes #8224
### Motivation
Support limits the max namespaces per tenant of the Pulsar cluster. When the namespaces reach the max namespaces of the tenant, the broker should reject the new namespace request.


### Modifications
Add maxNamespacePerTenant=0 in the broker.conf and don't limit by default.

### Verifying this change
AdminApiTest2#testMaxNamespacePerTenant
Motivation

In order to facilitate the support of additional JClouds-supported providers, we first needed to clean up the existing code, as there were a lot of if/then/else constructs throughout the code that were based on the assumption that we either supported AWS S3 or Google Cloud Storage. I didn't want to keep adding else if's to these code blocks for every new provider we add, so I decided to refactor the code to make it a bit cleaner

Modifications

in addition to being home for most of the aforementioned if/then/else blocks, the BlobStoreManagedLedgerOffloader class had multiple responsibilities in addition to providing an implementation for the LedgerOffloader interface. My goal was to simplify this class such that its only responsibility was to implement the LedgerOffloader interface.

The other major change was the addition of the JCloudBlobStoreProvider enum, which implements 3 interfaces that allow for it to handle the provider specific logic for things such as acquiring the credentials, validating the configuration, and creating a provider-specific instance of BlobStore.

Result

After this change, we will be able to easily add support for additional JClouds-supported providers by simply adding new elements to the JCloudBlobStoreProvider Enums since the other logic has been isolated and is not vendor specific.

See #2865 for more details

* Refactored JCloud Tiered Storage

* Refactored JCloud Tiered Storage

* Added missing import statements

* Refactored JCloud Tiered Storage

* Refactored JCloud Tiered Storage

* Added missing import statements

* Refactored JCloud Tiered Storage

* Refactored JCloud Tiered Storage

* Added missing import statements

* fix test

* add test logs

* fix logs

* fix test

* fix

* fix

* fix broker log

* fix configuration

* fix

* fix

* fix

* fix get BlobStore

* repair test presto query tiered storage data

* fix test

* fix test

* fix test TestPrestoQueryTieredStorage

* fix

* fix

Co-authored-by: Sijie Guo <sijie@apache.org>
Co-authored-by: gaoran10 <gaoran_10@126.com>
Co-authored-by: xiaolong.ran <rxl@apache.org>
…ies (#8284)

Fix #8229 

### Bug Detail
refer to broker dump file info: https://gist.github.com/lhotari/c34db2664b15fc32c9f0e7ae8b37dbda#gistcomment-3491707

We take the first row in the extracted table for example.

```shell
cursor.markDeletePosition = 2273599:-1
opread readPosition = 2282101:0
opread nextReadPosition = 2282101:0
cursor readPosition = 2273599:0
cursor writePosition = 2273599:4
cursor.ledger.currentLedger.lastAddConfirmed = -1
cursor.ledger.currentLedger.ledgerId = 2282101

SQL:
((cursor.markDeletePosition.ledgerId.toString() + ":") + cursor.markDeletePosition.entryId.toString()) AS "cursor.markDeletePosition", ((readPosition.ledgerId.toString() + ":") + readPosition.entryId.toString()) AS "opread readPosition", 
((nextReadPosition.ledgerId.toString() + ":") + nextReadPosition.entryId.toString()) AS "opread nextReadPosition",
 ((cursor.readPosition.ledgerId.toString() + ":") + cursor.readPosition.entryId.toString()) AS "cursor readPosition", 
((cursor.ledger.lastConfirmedEntry.ledgerId.toString() + ":") + cursor.ledger.lastConfirmedEntry.entryId.toString()) AS "cursor writePosition", 
cursor.ledger.currentLedger.lastAddConfirmed, cursor.ledger.currentLedger.ledgerId.toString() AS
 "cursor.ledger.currentLedger.ledgerId"
```
When call `ManagedCursorImpl#asyncReadEntries`, cursor.readPosisition is **2273599:0**, however, when using cursor.readPostition to construct opReadEntry `OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx);`, it use the cursor.readPosition to construct op.readPostition `op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);`. Due to cursor.readPosition not exist in managedLedger ledgers map, `startReadOperationOnLedger` return the earliest available ledger position, and set op.readPosition to **2282101:0**, but the cursor.readPosition still **2273599:0**.

When call `ManagedLedgerImpl#asyncReadEntries` according to the constructed opReadEntry, it call `ManagedLedgerImpl#internalReadFromLedger`. The key variables as follow
```shell
ledger = 2282101:-1
lastPosition = 2273599:4
ledger.getId()[2282101] != lastPosition.getLedgerId() [2273599]
firstEntry = op.readPosition.getEntryId() = 0
lastEntryInLedger = ledger.getLastAddConfirmed = -1
```
Thus, it will go into the following branch
```java
if (firstEntry > lastEntryInLedger) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] No more messages to read from ledger={} lastEntry={} readEntry={}", name,
                        ledger.getId(), lastEntryInLedger, firstEntry);
            }

            if (currentLedger == null || ledger.getId() != currentLedger.getId()) {
                // Cursor was placed past the end of one ledger, move it to the
                // beginning of the next ledger
                Long nextLedgerId = ledgers.ceilingKey(ledger.getId() + 1);
                if (nextLedgerId != null) {
                    opReadEntry.updateReadPosition(new PositionImpl(nextLedgerId, 0));
                } else {
                    opReadEntry.updateReadPosition(new PositionImpl(ledger.getId() + 1, 0));
                }
            }

            opReadEntry.checkReadCompletion();
            return;
        }
```
Finally, it call `opReadEntry.checkReadCompletion()`, and then call `ManagedCursor#hasMoreEntries` to check whether has more entries to read. If `hasMoreEntries` returns true, it will setup another read thread to read more entries.

```java
public boolean hasMoreEntries() {
        // If writer and reader are on the same ledger, we just need to compare the entry id to know if we have more
        // entries.
        // If they are on different ledgers we have 2 cases :
        // * Writer pointing to valid entry --> should return true since we have available entries
        // * Writer pointing to "invalid" entry -1 (meaning no entries in that ledger) --> Need to check if the reader
        // is
        // at the last entry in the previous ledger
        PositionImpl writerPosition = ledger.getLastPosition();
        if (writerPosition.getEntryId() != -1) {
            return readPosition.compareTo(writerPosition) <= 0;
        } else {
            // Fall back to checking the number of entries to ensure we are at the last entry in ledger and no ledgers
            // are in the middle
            return getNumberOfEntries() > 0;
        }
    }
```
In `hasMoreEntries`, the key variables are `writerPosition` and `readPosition`. `writerPosition` is cursor.ledger.lastConfirmedEntry, which is `2273599:4` and `readPosition` is cursor.readPosition, which is `2273599:0`, thus, `hasMoreEntries` always return `true` and will fall into infinite loop and create a lot of read thread.

The bug is op.readPosition not sync immediatly with cursor.readPosition.

### Changes
1. sync op.readPosition with cursor.readPosition before calling `checkReadCompletion`.


* Fix Broker enters an infinite loop in ManagedLedgerImpl.asyncReadEntries

* format code

* format code
* add info for schema in Java function

* fix typos

* address comments
* [C++] Client is allocating buffer bigger than needed

* Removed `std::min()` as it was wrong
Signed-off-by: xiaolong.ran <rxl@apache.org>
…sar Java client #8294 (#8296)

* Some errors on the official website document of Pulsar Java client #8294

Co-authored-by: yixin <yixin@cmss.chinamobile.com>
### Modifications

When deploying Pulsar in Kubernetes, the zookeeper-metadata job does not reach completed status with ZooKeeper TLS enabled. These changes close the ZooKeeper connections before exiting therefore the Kubernetes job is marked as completed.
@wolfstudy
Copy link
Member

@wolfstudy would this be an acceptable change? If you prefer not to switch off warnings, I could add explicit casts.

Sorry for the reply later, the change looks good to me, thanks @bschofield

@wolfstudy
Copy link
Member

/pulsarbot run-failure-checks

@wolfstudy
Copy link
Member

Move this change to 2.7.0, we can consider including it in the larger version(2.7.0).

@codelipenghui
Copy link
Contributor

move to 2.8.0 first.

@codelipenghui codelipenghui modified the milestones: 2.7.0, 2.8.0 Nov 17, 2020
@codelipenghui codelipenghui removed this from the 2.8.0 milestone May 19, 2021
@wolfstudy
Copy link
Member

Thanks @bschofield work for this. For some reasons, we do not maintain the cgo version of the go client for the time being. You can use the native version of the go client. At present, all functions are basically the same as those of the cgo client, and there are many detailed optimizations.

If you have any questions in the process of using native go client, please feel free to leave your questions here, I am happy to talk with you.

@wolfstudy
Copy link
Member

@bschofield I closed this pull request first. If you have any questions, you can reopen it at any time and talk with me. Wish you a good journey in the native go client.

@wolfstudy wolfstudy closed this May 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/client type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

Projects

None yet

Development

Successfully merging this pull request may close these issues.

"incompatible pointer types" warnings building the Go (golang) pulsar client