Skip to content

Commit

Permalink
Implemented AutoOffsetReset Property in server and activity (#3665)
Browse files Browse the repository at this point in the history
Co-authored-by: Yannick Laubscher <yannick.laubscher@swissteach.ch>
  • Loading branch information
2 people authored and mohdali committed Feb 8, 2023
1 parent 2daecc7 commit f2d6a5b
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 37 deletions.
@@ -0,0 +1,23 @@
using System.Collections.Generic;
using System.Reflection;
using Confluent.Kafka;
using Elsa.Design;
using Elsa.Metadata;

namespace Elsa.Activities.Kafka.Activities.KafkaMessageReceived;

/// <summary>
/// Provides a List of SelectListItem for each AutoOffsetReset enum item.
/// This is used to populate the dropdown list in the designer.
/// </summary>
public class AutoOffsetResetOptionsProvider : IActivityPropertyOptionsProvider
{
public object? GetOptions(PropertyInfo property)
{
return new List<SelectListItem>()
{
new SelectListItem(nameof(AutoOffsetReset.Earliest),((int)AutoOffsetReset.Earliest).ToString()),
new SelectListItem(nameof(AutoOffsetReset.Latest),((int)AutoOffsetReset.Latest).ToString()),
};
}
}
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using Confluent.Kafka;
using Elsa.Activities.Kafka.Helpers;
using Elsa.ActivityResults;
using Elsa.Attributes;
Expand All @@ -17,7 +18,6 @@ namespace Elsa.Activities.Kafka.Activities.KafkaMessageReceived
)]
public class KafkaMessageReceived : Activity
{

[ActivityInput(
Hint = "Topic to listen to",
Order = 1,
Expand All @@ -35,8 +35,8 @@ public class KafkaMessageReceived : Activity
Order = 3,
UIHint = ActivityInputUIHints.Dictionary,
DefaultSyntax = SyntaxNames.Json,
SupportedSyntaxes = new[] { SyntaxNames.Json,SyntaxNames.JavaScript })]
SupportedSyntaxes = new[] { SyntaxNames.Json, SyntaxNames.JavaScript })]

public Dictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();

[ActivityInput(
Expand All @@ -46,6 +46,14 @@ public class KafkaMessageReceived : Activity
Category = PropertyCategories.Configuration)]
public string ConnectionString { get; set; } = default!;

[ActivityInput(
UIHint = ActivityInputUIHints.Dropdown,
OptionsProvider = typeof(AutoOffsetResetOptionsProvider),
Category = PropertyCategories.Configuration,
Order = 3
)]
public string AutoOffsetReset { get; set; } = default!;

public string ClientId => KafkaClientConfigurationHelper.GetClientId(Id);


Expand Down
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Elsa.Activities.Kafka.Configuration;
using Elsa.Activities.Kafka.Helpers;
using Elsa.Activities.Kafka.Services;
Expand Down Expand Up @@ -38,8 +39,8 @@ public SendKafkaMessage(IMessageSenderClientFactory messageSenderClientFactory)
Order = 3,
UIHint = ActivityInputUIHints.Dictionary,
DefaultSyntax = SyntaxNames.Json,
SupportedSyntaxes = new[] { SyntaxNames.Json,SyntaxNames.JavaScript })]
SupportedSyntaxes = new[] { SyntaxNames.Json, SyntaxNames.JavaScript })]

public Dictionary<string, string> Headers { get; set; } = new Dictionary<string, string>();

[ActivityInput(
Expand All @@ -55,8 +56,8 @@ public SendKafkaMessage(IMessageSenderClientFactory messageSenderClientFactory)
UIHint = ActivityInputUIHints.MultiLine,
SupportedSyntaxes = new[] { SyntaxNames.Json })]
public string Message { get; set; } = default!;


public string ClientId => KafkaClientConfigurationHelper.GetClientId(Id);

protected override async ValueTask<IActivityExecutionResult> OnExecuteAsync(ActivityExecutionContext context)
Expand All @@ -70,4 +71,4 @@ protected override async ValueTask<IActivityExecutionResult> OnExecuteAsync(Acti
return Done();
}
}
}
}
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -12,17 +13,21 @@ public MessageReceivedBookmark()
{
}

public MessageReceivedBookmark(string connectionString,string topic,string group, Dictionary<string, string>? headers)
public MessageReceivedBookmark(string connectionString, string topic, string group, Dictionary<string, string>? headers, Confluent.Kafka.AutoOffsetReset autoOffsetReset)
{
Topic = topic;
Group = group;
ConnectionString = connectionString;
Headers = headers ?? new Dictionary<string, string>();
AutoOffsetReset = autoOffsetReset;
}

public string Topic { get; set; } = default!;
public string Group { get; set; } = default!;
public string ConnectionString { get; set; } = default!;

public Confluent.Kafka.AutoOffsetReset AutoOffsetReset { get; set; } = Confluent.Kafka.AutoOffsetReset.Earliest;

public Dictionary<string, string> Headers { get; set; } = default!;
}

Expand All @@ -36,8 +41,9 @@ public class QueueMessageReceivedBookmarkProvider : BookmarkProvider<MessageRece
topic: (await context.ReadActivityPropertyAsync(x => x.Topic, cancellationToken))!,
group: (await context.ReadActivityPropertyAsync(x => x.Group, cancellationToken))!,
connectionString: (await context.ReadActivityPropertyAsync(x => x.ConnectionString, cancellationToken))!,
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!
))
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!,
autoOffsetReset: Enum.Parse<Confluent.Kafka.AutoOffsetReset>(await context.ReadActivityPropertyAsync(x => x.AutoOffsetReset, cancellationToken) ?? ((int)Confluent.Kafka.AutoOffsetReset.Earliest).ToString())!
))
};
}
}
}
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using Confluent.Kafka;

namespace Elsa.Activities.Kafka.Configuration
{
Expand All @@ -8,27 +9,28 @@ public class KafkaConfiguration
public string ConnectionString { get; }
public string Topic { get; }
public string Group { get; }

public Confluent.Kafka.AutoOffsetReset AutoOffsetReset { get; }
public string ClientId { get; }

public Dictionary<string, string> Headers { get; }

public KafkaConfiguration(string connectionString, string topic, string group, Dictionary<string, string> headers,string clientId)
public KafkaConfiguration(string connectionString, string topic, string group, Dictionary<string, string> headers, string clientId, Confluent.Kafka.AutoOffsetReset autoOffsetReset = AutoOffsetReset.Earliest)
{
ConnectionString = connectionString;
Topic = topic;
Group = group;
ClientId = clientId;
Headers = headers;
ClientId = clientId;
AutoOffsetReset = autoOffsetReset;
}

public override int GetHashCode()
{
var headersString = string.Concat(Headers.Select((x, y) => string.Concat(x, y)));
var headersString = string.Concat(Headers.Select((x, y) => string.Concat(x, y)));

return System.HashCode.Combine(ConnectionString, Topic, Group,headersString);
return System.HashCode.Combine(ConnectionString, Topic, Group, headersString);
}

public string TopicFullName => string.IsNullOrEmpty(Topic) ? Group : $"{Topic}@{Group}";
}
}
}
31 changes: 15 additions & 16 deletions src/activities/Elsa.Activities.Kafka/Services/Client.cs
Expand Up @@ -17,35 +17,36 @@ public class Client : IClient
private IConsumer<Ignore, string>? _consumer;
private Func<KafkaMessageEvent, Task>? _messageHandler;
private Func<Exception, Task>? _errHandler;

public Client(KafkaConfiguration configuration)
{
Configuration = configuration;
}

public KafkaConfiguration Configuration { get; }
public void SetHandlers(Func<KafkaMessageEvent, Task> receiveHandler,Func<Exception, Task> errorHandler,CancellationToken cancellationToken)

public void SetHandlers(Func<KafkaMessageEvent, Task> receiveHandler, Func<Exception, Task> errorHandler, CancellationToken cancellationToken)
{
_messageHandler = receiveHandler;
_cancellationToken = cancellationToken;
_errHandler = errorHandler;
}

public Task StartProcessing(string topic,string group)
public Task StartProcessing(string topic, string group)
{
_consumer = new ConsumerBuilder<Ignore, string>(new ConsumerConfig()
{
BootstrapServers = Configuration.ConnectionString,
GroupId = group
GroupId = group,
AutoOffsetReset = Configuration.AutoOffsetReset
}).Build();
if (_consumer != null)

if (_consumer != null)
Consumer.Consume(topic, _consumer).Subscribe(HandleMessage, OnError, _cancellationToken);

return Task.CompletedTask;
}

public async Task PublishMessage(string message)
{
var producerConfig = new ProducerConfig()
Expand All @@ -54,7 +55,7 @@ public async Task PublishMessage(string message)
};

using var producer = new ProducerBuilder<Null, string>(producerConfig).Build();
await producer.ProduceAsync(Configuration.Topic, new Message<Null, string> {Headers = GetHeaders(),Value=message }, _cancellationToken);
await producer.ProduceAsync(Configuration.Topic, new Message<Null, string> { Headers = GetHeaders(), Value = message }, _cancellationToken);
}

public async Task Dispose()
Expand All @@ -65,14 +66,12 @@ public async Task Dispose()
{
_consumer.Unsubscribe();
_consumer.Close();

}
catch (Exception e)
{
if (_errHandler != null) await _errHandler(e);
}
}

}

private async void OnError(Exception error)
Expand All @@ -83,16 +82,16 @@ private async void OnError(Exception error)
private async void HandleMessage(Message<Ignore, string> message)
{
var ev = new KafkaMessageEvent(message, _cancellationToken);
if (_messageHandler != null)

if (_messageHandler != null)
await _messageHandler(ev);
}

private Headers GetHeaders()
{
var headers = new Headers();
foreach (var entry in Configuration.Headers)

foreach (var entry in Configuration.Headers)
headers.Add(entry.Key, Encoding.ASCII.GetBytes(entry.Value));

return headers;
Expand Down
2 changes: 1 addition & 1 deletion src/activities/Elsa.Activities.Kafka/Services/Worker.cs
Expand Up @@ -82,7 +82,7 @@ private async Task TriggerWorkflowsAsync(KafkaMessageEvent ev, CancellationToken

var config = _client.Configuration;

var bookmark = new MessageReceivedBookmark(config.ConnectionString, config.Topic, config.Group, GetHeaders(ev.Message.Headers));
var bookmark = new MessageReceivedBookmark(config.ConnectionString, config.Topic, config.Group, GetHeaders(ev.Message.Headers), config.AutoOffsetReset);
var launchContext = new WorkflowsQuery(ActivityType, bookmark);

using var scope = _scopeFactory.CreateScope();
Expand Down
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Elsa.Activities.Kafka.Bookmarks;
using Elsa.Activities.Kafka.Configuration;
using Elsa.Activities.Kafka.Helpers;
Expand Down Expand Up @@ -165,8 +166,9 @@ private KafkaConfiguration CreateConfigurationFromBookmark(MessageReceivedBookma
var group = bookmark.Group;
var headers = bookmark.Headers;
var clientId = KafkaClientConfigurationHelper.GetClientId(activityId);
var autoOffsetReset = bookmark.AutoOffsetReset;

return new KafkaConfiguration(connectionString!, topic!, group!, headers, clientId);
return new KafkaConfiguration(connectionString!, topic!, group!, headers, clientId, autoOffsetReset);
}
}
}

0 comments on commit f2d6a5b

Please sign in to comment.