Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Fix the Send() stuck caused by callback() not being called #880

Merged
merged 1 commit into from
Nov 2, 2022

Conversation

Gleiphir2769
Copy link
Contributor

@Gleiphir2769 Gleiphir2769 commented Nov 1, 2022

Master Issue: #877

Motivation

The producer Send() is a sync method. It depends on closing doneCh by sendRequest.callback() to finish waiting. But there are some return case does not call sendRequest.callback(), which may cause Send() stuck.

if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
return
}

if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.publishSemaphore.Release()
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}

if err != nil {
p.publishSemaphore.Release()
p.log.WithError(err).Error("get schema version fail")
return

There are three places need to add sendRequest.callback() before return. These code is all intrduced in #611. I guess the author missed that if callback is not be called before retuen, Send() will stuck because doneCh can not be closed.

Modifications

Add the missed sendRequest.callback().

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

@Gleiphir2769
Copy link
Contributor Author

It can be easily reproduced by following.

func TestSchemaStuck(t *testing.T) {
	client, err := NewClient(ClientOptions{
		URL: lookupURL,
	})
	assert.NoError(t, err)
	defer client.Close()

	schema1 := NewAvroSchema(`{"fields":
		[
			{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]}
		],
		"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
	schema2 := NewAvroSchema(`{"fields":
		[
			{"name":"id","type":"int"},{"default":null,"name":"name","type":["null","string"]},
			{"default":null,"name":"age","type":["null","int"]}
		],"name":"MyAvro3","namespace":"PulsarTestCase","type":"record"}`, nil)
	v1 := map[string]interface{}{
		"id": 1,
		"name": map[string]interface{}{
			"string": "aac",
		},
	}
	v2 := map[string]interface{}{
		"id": 1,
		"name": map[string]interface{}{
			"string": "test",
		},
		"age": map[string]interface{}{
			"int": 10,
		},
	}
	topic := newTopicName()

	producer, err := client.CreateProducer(ProducerOptions{
		Topic:              topic,
		Schema:             schema1,
		DisableMultiSchema: true,
	})
	assert.NoError(t, err)
	assert.NotNil(t, producer)
	defer producer.Close()

	for i := 0; i < 10; i++ {
		var messageContent []byte
		var key string
		var schema Schema
		if i%2 == 0 {
			messageContent, err = schema1.Encode(v1)
			key = "v1"
			schema = schema1
			assert.NoError(t, err)
		} else {
			messageContent, err = schema2.Encode(v2)
			key = "v2"
			schema = schema2
			assert.NoError(t, err)
		}

		_, err = producer.Send(context.Background(), &ProducerMessage{
			Payload: messageContent,
			Key:     key,
			Schema:  schema,
		})
	}
	producer.Flush()
}

@Gleiphir2769
Copy link
Contributor Author

@nodece @RobertIndie Hi, could you give a review when you have time?

@nodece nodece merged commit 0412f28 into apache:master Nov 2, 2022
@RobertIndie RobertIndie added this to the v0.10.0 milestone Mar 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants