-
Notifications
You must be signed in to change notification settings - Fork 326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support multiple schema version for producer and consumer #611
Conversation
pulsar/consumer_partition.go
Outdated
} | ||
} | ||
|
||
func (s *schemaInfoCache) get(key string) (schema *Schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove the variable names here and below and return values from the functions. It makes the code more difficult to follow especially below since some returns have values and others do not.
pulsar/impl_message.go
Outdated
if err != nil { | ||
return err | ||
} | ||
return (*schema).Decode(msg.payLoad, v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is casting needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks review. Because schemaCache store the pointer of schema interface, I have change to schema instance .
pulsar/producer_partition.go
Outdated
} | ||
|
||
if p.options.DisableMultiSchema { | ||
if msg.Schema == nil && p.options.Schema == nil && !msg.Schema.GetSchemaInfo().isSame((p.options.Schema).GetSchemaInfo()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be simplified or a comment added. I'm having a difficult time understanding.
b9cea6a
to
0b1bb11
Compare
Hello @cckellogg and @oryx2. |
Hello @cckellogg and @oryx2. This is a gentle reminder to please review and merge this PR. As I mentioned above we are eagerly waiting for this feature |
@chiragbparikh there are merge conflicts in this PR. please resolve them so that the PR can be reviewed |
@lhotari I just resolve them, please check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pulsar/consumer_partition.go
Outdated
@@ -143,10 +144,61 @@ type partitionConsumer struct { | |||
dlq *dlqRouter | |||
|
|||
log log.Logger | |||
|
|||
providersMutex sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run go fmt
to fix all formatting issues
pulsar/producer.go
Outdated
// Disable multiple Schame Version | ||
// Default false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whitespace
@cckellogg PTAL |
pulsar/consumer_partition.go
Outdated
@@ -143,10 +144,61 @@ type partitionConsumer struct { | |||
dlq *dlqRouter | |||
|
|||
log log.Logger | |||
|
|||
providersMutex sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Run go fmt
to fix all formatting issues
pulsar/consumer_partition.go
Outdated
schema = s.cache[key] | ||
s.lock.RUnlock() | ||
if schema != nil { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return schema nil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or
s.lock.RLock()
schema, ok = s.cache[key]
s.lock.RUnlock()
if ok {
return schema, nil
}
s.lock.Lock() | ||
defer s.lock.Unlock() | ||
|
||
s.cache[schemaVersionHash] = schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we allow schema version overwrite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, overwrite is not allowed. It is just a private function for SchemaInfoCache.Get
which do the overwrite check.
pulsar/consumer_partition.go
Outdated
var properties = make(map[string]string) | ||
if pbSchema.Properties != nil { | ||
for _, entry := range pbSchema.Properties { | ||
properties[*entry.Key] = properties[*entry.Value] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since Key and Value are both declared as *string, do you need check the reference is not nil before assigning them to properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a function do the same job ConvertToStringMap, but no check nil. May be I should add nil check in ConvertToStringMap function, then invoker it.
pulsar/internal/lookup_service.go
Outdated
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic | |||
return topics, nil | |||
} | |||
|
|||
func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) { | |||
return nil, errors.New("not support") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this more descriptive? Maybe an error like this helps to debug.
errors.New("GetSchema is supported by httpLookupService")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I will add error message.
pulsar/producer_partition.go
Outdated
} | ||
|
||
if p.options.DisableMultiSchema { | ||
if msg.Schema != p.options.Schema { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we want to compare the pointer or the actual value. Shema is an interface. So the current evaluation just make sure they are pointing to the same reference. Do you want to compare the value of Schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Maybe I can compare the value of Schema with this.
msg.Schema != nil && p.options.Schema != nil && msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash()
@cckellogg @zzzming can we please review the changes on the PR. Thanks in advance. |
@cckellogg @wolfstudy can we please get this MR merged. We are eagerly waiting to use this feature. |
@cckellogg can we please get a review on this MR? Appreciate it! |
…nto apache-master # Conflicts: # pulsar/consumer_partition.go # pulsar/producer_partition.go
This feature is something that we are eagerly awaiting. |
@oryx2 could you please check the CI errors? thanks. |
@freeznet OK |
Sorry,I mad some syntax error when resolve conflicts.Already fixed.Thanks for review. |
@oryx2 thanks, please check the CI failures again |
I am curious why this PR is getting so little attention. If there is no need for FORWARD_TRANSITIVE strategy in golang client, what would be a good alternative? |
Motivation
Implement PIP-43.
Support multiple schema version for producer and consumer.
Modifications
DisableMultiSchema
option for producerVerifying this change
(Please pick either of the following options)
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation