Skip to content

Commit

Permalink
Fix the client crash when the transaction coordinator not found (#1241)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored Jul 11, 2024
1 parent 1152cfc commit 64d1e00
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pulsar/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ const (
// fenced. Applications are now supposed to close it and create a
// new producer
ProducerFenced
// TransactionCoordinatorNotEnabled indicates that the transaction coordinator is not enabled.
// This error is returned when an operation that requires the transaction coordinator is attempted
// but the transaction coordinator feature is not enabled in the system or the transaction coordinator
// has not initialized
TransactionCoordinatorNotEnabled
)

// Error implement error interface, composed of two parts: msg and result.
Expand Down
6 changes: 6 additions & 0 deletions pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type transactionCoordinatorClient struct {
// where the TC located.
const TransactionCoordinatorAssign = "persistent://pulsar/system/transaction_coordinator_assign"

var ErrTransactionCoordinatorNotEnabled = newError(TransactionCoordinatorNotEnabled, "The broker doesn't enable "+
"the transaction coordinator, or the transaction coordinator has not initialized")

// newTransactionCoordinatorClientImpl init a transactionImpl coordinator client and
// acquire connections with all transactionImpl coordinators.
func newTransactionCoordinatorClientImpl(client *client) *transactionCoordinatorClient {
Expand All @@ -60,6 +63,9 @@ func (tc *transactionCoordinatorClient) start() error {
tc.cons = make([]internal.Connection, r.Partitions)

//Get connections with all transaction_impl coordinators which is synchronized
if r.Partitions <= 0 {
return ErrTransactionCoordinatorNotEnabled
}
for i := 0; i < r.Partitions; i++ {
err := tc.grabConn(uint64(i))
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions pulsar/transaction_disabled_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build clustered

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestTransactionDisabled(t *testing.T) {
_, err := NewClient(ClientOptions{
URL: "pulsar://broker-1:6650",
EnableTransaction: true,
})
assert.NotNil(t, err)
assert.ErrorContains(t, err, "Transactions are not enabled")
}
2 changes: 1 addition & 1 deletion scripts/run-ci-clustered.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

set -e -x

go test -race -coverprofile=/tmp/coverage -timeout=5m -tags clustered -v -run 'Test.*ClusteredTestSuite' -v ./pulsar
go test -race -coverprofile=/tmp/coverage -timeout=5m -tags clustered -v -run 'Test.*ClusteredTestSuite|TestTransactionDisabled' -v ./pulsar
go tool cover -html=/tmp/coverage -o coverage.html

0 comments on commit 64d1e00

Please sign in to comment.