diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index cd7aa5019..28cb9c556 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1734,6 +1734,15 @@ func (pc *partitionConsumer) grabConn() error { if err != nil { pc.log.WithError(err).Error("Failed to create consumer") + if err == internal.ErrRequestTimeOut { + requestID := pc.client.rpcClient.NewRequestID() + cmdClose := &pb.CommandCloseConsumer{ + ConsumerId: proto.Uint64(pc.consumerID), + RequestId: proto.Uint64(requestID), + } + _, _ = pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID, + pb.BaseCommand_CLOSE_CONSUMER, cmdClose) + } return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5daf54c44..7d514d550 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -251,6 +251,14 @@ func (p *partitionProducer) grabCnx() error { res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") + if err == internal.ErrRequestTimeOut { + id := p.client.rpcClient.NewRequestID() + _, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER, + &pb.CommandCloseProducer{ + ProducerId: &p.producerID, + RequestId: &id, + }) + } return err }