Skip to content
This repository has been archived by the owner on Jun 14, 2023. It is now read-only.

Update kafka.go #56

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion kafkareporter/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func New(addrs []string, opts ...Option) (go2sky.Reporter, error) {
}
}()
}

go r.readBackMessage()
return r, nil
}

Expand Down Expand Up @@ -297,6 +297,17 @@ func (r *kafkaReporter) Close() {
r.logger.Print(err)
}
}
//The result information of the reported information is processed in a loop to achieve multiple reports in a single kafka connection.
func (r *kafkaReporter) readBackMessage() {
for {
select {
case <-r.producer.Successes():
case err := <- r.producer.Errors():
r.logger.Fatalf( "kafkaReport readBackMessage error detail is %s",err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the log.Errorf output will do, which will cause the process to end

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check the revised document to confirm. github.com/Shopify/sarama/async_producer.go

// Successes is the success output channel back to the user when Return.Successes is
// enabled. If Return.Successes is true, you MUST read from this channel or the
// Producer will deadlock. It is suggested that you send and read messages
// together in a single select statement.
Successes() <-chan *ProducerMessage

// Errors is the error output channel back to the user. You MUST read from this
// channel or the Producer will deadlock when the channel is full. Alternatively,
// you can set Producer.Return.Errors in your config to false, which prevents
// errors to be returned.
Errors() <-chan *ProducerError

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default:
}
}
}

func buildOSInfo() (props []*commonv3.KeyStringValuePair) {
processNo := tool.ProcessNo()
Expand Down