Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OOM #1043

Closed
chushuai opened this issue Aug 16, 2023 · 3 comments
Closed

OOM #1043

chushuai opened this issue Aug 16, 2023 · 3 comments

Comments

@chushuai
Copy link

chushuai commented Aug 16, 2023

I have 7 billion data to be written to kafka, and when I write 200 million, the memory OOM, The following is part of my code logic, which can help you see what the problem is?

       producer, err :=kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapServers})
       domainList := []string{}
	scanner := bufio.NewScanner(inFIle)
	count := 0
	for scanner.Scan() {
		lineText := strings.TrimSpace(scanner.Text())
		if lineText == "" {
			continue
		}
		count += 1
		if count%1000000 == 0 {
			logger.Infof("Already read %d lines", count)
		}
		domainList = append(domainList, lineText)
		if len(domainList) >= 1000 {
			raw, _ := json.Marshal(domainList)
			err = producer.Produce(&kafka.Message{
				TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
				Value:          raw,
			}, nil)
			if err != nil {
				logger.Error(err)
				if err.(kafka.Error).Code() == kafka.ErrQueueFull {
					logger.Info("kafka queue is full, wait 1s for messages to be delivered then try again")
					time.Sleep(10*time.Second)
					continue
				}
			}
			domainList = []string{}
		}
	}
@milindl
Copy link
Contributor

milindl commented Aug 24, 2023

Just a question, are you reading from the producer.Events() anywhere (in a different goroutine perhaps)? Otherwise, the channel will grow for every message that is delivered.

@chushuai
Copy link
Author

I did not read any messages from producer.Events() anywhere, nor did the example code contain any references to producer.Events() @milindl

@milindl
Copy link
Contributor

milindl commented Aug 25, 2023

You can take a look at this example: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_example/producer_example.go#L51
You should read the messages from producer.Events(), as well, in your code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants