Skip to content

Commit

Permalink
[improve] Use physical address information in connection pool key (#1206
Browse files Browse the repository at this point in the history
)

### Motivation

Migrate apache/pulsar#22085 and (parts of) apache/pulsar-client-cpp#411 over to the Go client. Context for this idea [here](https://github.com/apache/pulsar/pull/22085/files#r1497008116).

Golang client support for blue-green migration needs the connection pool to differentiate between connections with the same logical address, but different physical addresses. Otherwise, the wrong connection might be used by the client, in effect pointing to the old cluster, instead of the new one.

### Modifications

The connection pool maintains a map of connections, keyed by their logical address and a random connection id. This PR proposes including the physical address in the key also, therefore allowing the upper layer to differentiate between connections with identical logical addresses, but different physical addresses. 

In addition to this change, the test setup had to be fixed to address breakages in `TestRetryWithMultipleHosts` and `TestReaderWithMultiHosts`. All tests in the repository are using a local standalone setup currently. This unusual configuration has broker lookup operations reply with flag `proxyThroughServiceUrl=true` ([ref](https://github.com/apache/pulsar/blob/e7c2a75473b545134a3b292ae0e87a79d65cb756/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L369)). This in turn has the Go lookup service attempt a name resolution of the configured service addresses ([ref](https://github.com/apache/pulsar-client-go/blob/c3e94e243a730ae22d59bf9d330c4539733b7eef/pulsar/internal/lookup_service.go#L124)). The resolver picks addresses in round-robin mode. Because these two tests use a correct (reachable) broker address _and_ an unreachable address, the resolved address can end up pointing to the unreachable address. The connection pool is then corrupted with a logically invalid entry, causing the tests to fail:

| Logical Address | Physical Address | Notes |
| --------------- | ---------------- | ----- |
| reachable-broker | reachable-broker | Valid |
| unreachable-broker | unreachable-broker | Valid, but currently unusable |
| reachable-broker | unreachable-broker | *Invalid entry* |

To address the issue:
- Switch the test setup to a more common cluster configuration. File `integration-tests/clustered/docker-compose.yml` instructs how this setup should look like.
- Migrate the tests to separate files and test suites. New test files `pulsar/client_impl_clustered_test.go` and `pulsar/reader_clustered_test.go` contain Go tag `clustered`, allowing them to be ignored during the standalone test runs by virtue of the Go build process.
- Add script `run-ci-clustered.sh`, specifying the "clustered" tests to run.
- Changes in the `Makefile` add targets `make test_clustered` `make test_standalone` to run the respective test suites independently, while allowing `make test` to run all the tests, as before.
- `Dockerfile` and `run-ci.sh` are modified to run the Go build process in the container build, such that it does not need to be run again in the new `run-ci-clustered.sh` script. The image is locally consumed by the tests only and is not published, so there is no risk of contaminating users.
  • Loading branch information
dragosvictor committed Apr 18, 2024
1 parent a6f4b71 commit 26e8085
Show file tree
Hide file tree
Showing 10 changed files with 393 additions and 121 deletions.
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@ COPY integration-tests/conf/.htpasswd \
COPY . /pulsar/pulsar-client-go

ENV PULSAR_EXTRA_OPTS="-Dpulsar.auth.basic.conf=/pulsar/conf/.htpasswd"

WORKDIR /pulsar/pulsar-client-go

ENV GOPATH=/pulsar/go
ENV GOCACHE=/tmp/go-cache

# Install dependencies
RUN go mod download

# Basic compilation
RUN go build ./pulsar
RUN go build ./pulsaradmin
RUN go build -o bin/pulsar-perf ./perf
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ container:
--build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \
--build-arg ARCH="${CONTAINER_ARCH}" .

test: container
test: container test_standalone test_clustered

test_standalone: container
docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh"

test_clustered: container
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true
until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done
docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh"
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down

clean:
docker rmi --force $(IMAGE_NAME) || true
rm bin/*
167 changes: 167 additions & 0 deletions integration-tests/clustered/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: '3'
networks:
pulsar:
driver: bridge
services:
# Start ZooKeeper
zookeeper:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: zookeeper
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
exec bin/pulsar zookeeper"
healthcheck:
test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
interval: 10s
timeout: 5s
retries: 30

# Initialize cluster metadata
pulsar-init:
container_name: pulsar-init
hostname: pulsar-init
image: apachepulsar/pulsar:${PULSAR_VERSION}
networks:
- pulsar
environment:
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bin/pulsar initialize-cluster-metadata \
--cluster cluster-a \
--zookeeper zookeeper:2181 \
--configuration-store zookeeper:2181 \
--web-service-url http://broker-1:8080 \
--broker-service-url pulsar://broker-1:6650
depends_on:
zookeeper:
condition: service_healthy

# Start bookie
bookie:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: bookie
restart: on-failure
networks:
- pulsar
environment:
- clusterName=cluster-a
- zkServers=zookeeper:2181
- metadataServiceUri=metadata-store:zk:zookeeper:2181
- advertisedAddress=bookie
- BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
depends_on:
zookeeper:
condition: service_healthy
pulsar-init:
condition: service_completed_successfully
command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"

proxy:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: proxy
hostname: proxy
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
ports:
- "8080:8080"
- "6650:6650"
depends_on:
broker-1:
condition: service_healthy
broker-2:
condition: service_healthy
command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy"

# Start broker 1
broker-1:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-1
hostname: broker-1
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-1
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-1:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30

# Start broker 2
broker-2:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-2
hostname: broker-2
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-2
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-2:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30
89 changes: 89 additions & 0 deletions pulsar/client_impl_clustered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build clustered

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/suite"
)

type clientClusteredTestSuite struct {
suite.Suite
}

func TestClientClusteredTestSuite(t *testing.T) {
suite.Run(t, new(clientClusteredTestSuite))
}

func (suite *clientClusteredTestSuite) TestRetryWithMultipleHosts() {
req := suite.Require()
// Multi hosts included an unreached port and the actual port for verify retry logic
client, err := NewClient(ClientOptions{
URL: "pulsar://broker-1:6600,broker-1:6650",
})
req.NoError(err)
defer client.Close()

topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
req.NoError(err)
defer producer.Close()

ctx := context.Background()
var msgIDs [][]byte

for i := 0; i < 10; i++ {
if msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
req.NoError(err)
} else {
req.NotNil(msgID)
msgIDs = append(msgIDs, msgID.Serialize())
}
}

req.Equal(10, len(msgIDs))

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
req.NoError(err)
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
req.NoError(err)
req.Contains(msgIDs, msg.ID().Serialize())
consumer.Ack(msg)
}

err = consumer.Unsubscribe()
req.NoError(err)
}
55 changes: 0 additions & 55 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,61 +570,6 @@ func anonymousNamespacePolicy() map[string]interface{} {
}
}

func TestRetryWithMultipleHosts(t *testing.T) {
// Multi hosts included an unreached port and the actual port for verify retry logic
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6600,localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})

assert.Nil(t, err)
defer producer.Close()

ctx := context.Background()
var msgIDs [][]byte

for i := 0; i < 10; i++ {
if msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
assert.Nil(t, err)
} else {
assert.NotNil(t, msgID)
msgIDs = append(msgIDs, msgID.Serialize())
}
}

assert.Equal(t, 10, len(msgIDs))

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Contains(t, msgIDs, msg.ID().Serialize())
consumer.Ack(msg)
}

err = consumer.Unsubscribe()
assert.Nil(t, err)

}

func TestHTTPSConnectionCAError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: webServiceURLTLS,
Expand Down
7 changes: 4 additions & 3 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func NewConnectionPool(
}

func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
key := p.getMapKey(logicalAddr)
p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", physicalAddr).Debug("Getting pooled connection")
key := p.getMapKey(logicalAddr, physicalAddr)

p.Lock()
conn, ok := p.connections[key]
Expand Down Expand Up @@ -133,13 +134,13 @@ func (p *connectionPool) Close() {
p.Unlock()
}

func (p *connectionPool) getMapKey(addr *url.URL) string {
func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(addr.Host, '-', idx)
return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
}

func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) {
Expand Down

0 comments on commit 26e8085

Please sign in to comment.