Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CapSubscribe Group Not Work #89

Closed
s995203 opened this issue Feb 28, 2018 · 2 comments
Closed

CapSubscribe Group Not Work #89

s995203 opened this issue Feb 28, 2018 · 2 comments

Comments

@s995203
Copy link

s995203 commented Feb 28, 2018

Group not working when use kafka
It's always DoSomthing1 or DoSomthing2

    [CapSubscribe("users.added", Group = "user.doSomthing1")]
    public void DoSomthing1(User user)
    {
        // somthing here...
    }

    [CapSubscribe("users.added", Group = "user.doSomthing2")]
    public void DoSomthing2(User user)
    {
        // somthing here...
    }

I think it's because async task use same setting object, and not lock correctly

DotNetCore.CAP.Kafka.KafkaConsumerClient

    private void InitKafkaClient()
    {
        _kafkaOptions.MainConfig["group.id"] = _groupId;

        var config = _kafkaOptions.AsKafkaConfig();
        _consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
        _consumerClient.OnConsumeError += ConsumerClient_OnConsumeError;
        _consumerClient.OnMessage += ConsumerClient_OnMessage;
        _consumerClient.OnError += ConsumerClient_OnError;
    }

Maybe should be

    private void InitKafkaClient()
    {
        lock (_kafkaOptions)
        {
            _kafkaOptions.MainConfig["group.id"] = _groupId;

            var config = _kafkaOptions.AsKafkaConfig();
            _consumerClient = new Consumer<Null, string>(config, null, StringDeserializer);
            _consumerClient.OnConsumeError += ConsumerClient_OnConsumeError;
            _consumerClient.OnMessage += ConsumerClient_OnMessage;
            _consumerClient.OnError += ConsumerClient_OnError;
        }
    }

Or what have I misunderstood?

@yang-xiaodong
Copy link
Member

Thanks for your report, I will check it later.

@yang-xiaodong
Copy link
Member

yang-xiaodong commented Mar 2, 2018

@s995203 I agree with you.
Because of KafkaOptions is a singleton, so there may be a thread safety issue here.
I will fix it , thanks again.

@s995203 s995203 closed this as completed Mar 2, 2018
yang-xiaodong added a commit that referenced this issue Mar 16, 2018
* Fixed the connection bug of getting message from table. #83

* update version to 2.1.4

* remove `TableNamePrefix` option from `MySqlOptions` to `EFOptions`.  #84

* fixed entityframework rename table name prefix bug.  #84

* fixed sql server scripts bug of create table scheme. #85

* fixed entityframework rename table name prefix bug. #84

* modify error message of logger write

* Fixed bug of the FailedRetryCount does not increase when raised SubscriberNotFoundException. #90

* Fixed thread safety issue about KafkaOptions. #89

* upgrade nuget package
yang-xiaodong added a commit that referenced this issue Aug 21, 2018
* Fixed the connection bug of getting message from table. #83

* update version to 2.1.4

* remove `TableNamePrefix` option from `MySqlOptions` to `EFOptions`.  #84

* fixed entityframework rename table name prefix bug.  #84

* fixed sql server scripts bug of create table scheme. #85

* fixed entityframework rename table name prefix bug. #84

* modify error message of logger write

* Fixed bug of the FailedRetryCount does not increase when raised SubscriberNotFoundException. #90

* Fixed thread safety issue about KafkaOptions. #89

* upgrade nuget package
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants