Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka support #3168

Merged
merged 5 commits into from
Jan 10, 2023
Merged

Conversation

borjaferv
Copy link
Contributor

This PR contains Kafka support using Confluent.Kafka for sending and receiving events.
Use a work manager to deploy a worker with a consumer for each topic-group configured in the workflow activities.
Additionally, support for headers has been added to be able to filter specific events based on the flow.

Regarding the reception of events, it works as a first activity (Trigger) or as a suspended activity (BookMark)

Known issues

In M1 Macs (arm) the Kafka client (Confluent.Kafka) has trouble locating the library
librdkafka

So the solution I came up with was having to manually install it with the Homebrew wizard and manually compile it from source.

git clone https://github.com/edenhill/librdkafka.git  
cd librdkafka 
./configure --install-deps 
brew install  openssl zstd pkg-config 
./configure 
make 
sudo make install 

@jdevillard
Copy link
Contributor

Thanks for this PR, have you take a look at the Service Bus Implementation for Request/Response Pattern using Correlation? In case you need to Send a message and receive a Response in the next activity, depending on the speed of the process you can have race condition.

On the Service Bus Activity, some lines were added :

    protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context)
    {
        var message = CreateMessage(Message);


        if (!string.IsNullOrWhiteSpace(context.WorkflowExecutionContext.CorrelationId))
            message.CorrelationId = context.WorkflowExecutionContext.CorrelationId;


        return Combine(Done(), new ServiceBusActionResult(GetQueue(), GetTopic(), message, SendMessageOnSuspend));
    }

If we want to be sure that Elsa Bookmark are indexed before receiving the response, we need to delay the send on the Suspend using (in case of the ServiceBus) the ServiceBusActionResult.

public class ServiceBusActionResult : ActivityExecutionResult

What do you think about that? If we think about this kind of flow using this connector, I think It could be good to implement also that code. @sfmskywalker , any thought on this point ?

@sfmskywalker
Copy link
Member

@jdevillard Agreed, it would be good to apply the same strategy to avoid the race condition you mentioned.

@Juandavi1
Copy link

Juandavi1 commented Jul 15, 2022

what do you think about to integrate those activities with slim ? https://github.com/zarusz/SlimMessageBus
this allow us to use diferents event bus

Copy link
Member

@sfmskywalker sfmskywalker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why it took me so long, but thank you for your contribution!

@sfmskywalker
Copy link
Member

@Juandavi1 I can imagine a separate module that integrates SlimMessageBus, just like we do with Rebus and MassTransit 👍🏻

@sfmskywalker sfmskywalker merged commit a3d76ae into elsa-workflows:master Jan 10, 2023
@Snotax
Copy link

Snotax commented Jan 26, 2023

@sfmskywalker In which release is this going to be available ?

@sfmskywalker
Copy link
Member

Hi @Snotax, it will be part of the 2.10 release, which should happen soon.

@Snotax
Copy link

Snotax commented Jan 26, 2023

@sfmskywalker Awesome

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants