Skip to content

Commit

Permalink
Bugfix/kafka Workaround (#3827)
Browse files Browse the repository at this point in the history
* Added "IngoreHeaders" Flag that when active skips the comparison of header objects and bookmarks

* merge from master

---------

Co-authored-by: Yannick Laubscher <yannick.laubscher@swissteach.ch>
  • Loading branch information
2 people authored and mohdali committed Apr 14, 2023
1 parent 7e343a0 commit 9e32b8c
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ public class KafkaMessageReceived : Activity
public string Group { get; set; } = default!;

[ActivityInput(
Hint = "List of headers that should be present in the message",
Order = 3,
DefaultValue = false,
SupportedSyntaxes = new[] { SyntaxNames.Literal })]
public bool IgnoreHeaders { get; set; } = default!;

[ActivityInput(
Hint = "List of headers that should be present in the message",
Order = 4,
UIHint = ActivityInputUIHints.Dictionary,
DefaultSyntax = SyntaxNames.Json,
SupportedSyntaxes = new[] { SyntaxNames.Json, SyntaxNames.JavaScript })]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,30 @@ public MessageReceivedBookmark()
{
}

public MessageReceivedBookmark(string connectionString, string topic, string group, Dictionary<string, string>? headers, Confluent.Kafka.AutoOffsetReset autoOffsetReset, string schema)
public bool? Compare(IBookmark bookmark)
{
var equal = false;
var other = bookmark as MessageReceivedBookmark;
if (other is not null)
{
equal = other.Group == this.Group &&
other.Topic == this.Topic &&
(this.IgnoreHeaders || other.Headers == this.Headers) &&
other.AutoOffsetReset == this.AutoOffsetReset &&
other.Schema == this.Schema &&
other.ConnectionString == this.ConnectionString;
}
return equal;
}

public MessageReceivedBookmark(string connectionString, string topic, string group, Dictionary<string, string>? headers, Confluent.Kafka.AutoOffsetReset autoOffsetReset, string schema, bool ignoreHeaders)
{
Topic = topic;
Group = group;
ConnectionString = connectionString;
Headers = headers ?? new Dictionary<string, string>();
AutoOffsetReset = autoOffsetReset;
IgnoreHeaders = ignoreHeaders;
Schema = schema;
}

Expand All @@ -30,6 +47,7 @@ public MessageReceivedBookmark(string connectionString, string topic, string gro
public Confluent.Kafka.AutoOffsetReset AutoOffsetReset { get; set; } = Confluent.Kafka.AutoOffsetReset.Earliest;

public Dictionary<string, string> Headers { get; set; } = default!;
public bool IgnoreHeaders { get; set; }
public string Schema { get; set; }
}

Expand All @@ -45,6 +63,7 @@ public class QueueMessageReceivedBookmarkProvider : BookmarkProvider<MessageRece
connectionString: (await context.ReadActivityPropertyAsync(x => x.ConnectionString, cancellationToken))!,
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken))!,
autoOffsetReset: Enum.Parse<Confluent.Kafka.AutoOffsetReset>(await context.ReadActivityPropertyAsync(x => x.AutoOffsetReset, cancellationToken) ?? ((int)Confluent.Kafka.AutoOffsetReset.Earliest).ToString())!,
ignoreHeaders: (await context.ReadActivityPropertyAsync(x => x.IgnoreHeaders, cancellationToken))!,
schema: (await context.ReadActivityPropertyAsync(x => x.Schema, cancellationToken))!
))
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public override bool SupportsActivity(BookmarkProviderContext<KafkaMessageReceiv
connectionString: (await context.ReadActivityPropertyAsync(x => x.ConnectionString, cancellationToken))!,
headers: (await context.ReadActivityPropertyAsync(x => x.Headers, cancellationToken) ?? new Dictionary<string, string>())!,
autoOffsetReset: Enum.Parse<Confluent.Kafka.AutoOffsetReset>(await context.ReadActivityPropertyAsync(x => x.AutoOffsetReset, cancellationToken) ?? ((int)Confluent.Kafka.AutoOffsetReset.Earliest).ToString())!,
ignoreHeaders : (await context.ReadActivityPropertyAsync(x => x.IgnoreHeaders, cancellationToken))!,
schema: (await context.ReadActivityPropertyAsync(x => x.Schema, cancellationToken))!
))
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ public class KafkaConfiguration

public Dictionary<string, string> Headers { get; }

public KafkaConfiguration(string connectionString, string topic, string group, Dictionary<string, string> headers, string clientId, Confluent.Kafka.AutoOffsetReset autoOffsetReset = AutoOffsetReset.Earliest)
public bool IgnoreHeaders { get; }

public KafkaConfiguration(string connectionString, string topic, string group, Dictionary<string, string> headers, string clientId, Confluent.Kafka.AutoOffsetReset autoOffsetReset = AutoOffsetReset.Earliest, bool ignoreHeaders = false)
{
ConnectionString = connectionString;
Topic = topic;
Group = group;
Headers = headers;
ClientId = clientId;
AutoOffsetReset = autoOffsetReset;
IgnoreHeaders = ignoreHeaders;
}

public override int GetHashCode()
Expand Down
7 changes: 5 additions & 2 deletions src/activities/Elsa.Activities.Kafka/Services/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private async Task TriggerWorkflowsAsync(KafkaMessageEvent ev, CancellationToken
var schemaResolver = scope.ServiceProvider.GetRequiredService<ISchemaResolver>();
var schema = await schemaResolver.ResolveSchemaForMessage(ev.Message);

var bookmark = new MessageReceivedBookmark(config.ConnectionString, config.Topic, config.Group, GetHeaders(ev.Message.Headers), config.AutoOffsetReset, schema);
var bookmark = new MessageReceivedBookmark(config.ConnectionString, config.Topic, config.Group, GetHeaders(ev.Message.Headers, config.IgnoreHeaders), config.AutoOffsetReset, schema, config.IgnoreHeaders);
var launchContext = new WorkflowsQuery(ActivityType, bookmark, TenantId: tenantId);

// Launch KafkaMessageReceived activity
Expand All @@ -112,9 +112,12 @@ private async Task TriggerWorkflowsAsync(KafkaMessageEvent ev, CancellationToken
}
}

private Dictionary<string, string> GetHeaders(Headers headers)
private Dictionary<string, string> GetHeaders(Headers headers, bool ignoreHeaders)
{
var result = new Dictionary<string, string>();
if (ignoreHeaders)
return result;

foreach (var header in headers)
{
result.Add(header.Key, Encoding.UTF8.GetString(header.GetValueBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ private KafkaConfiguration CreateConfigurationFromBookmark(MessageReceivedBookma
var headers = bookmark.Headers;
var clientId = KafkaClientConfigurationHelper.GetClientId(activityId);
var autoOffsetReset = bookmark.AutoOffsetReset;

return new KafkaConfiguration(connectionString!, topic!, group!, headers, clientId, autoOffsetReset);
var ignoreHeaders = bookmark.IgnoreHeaders;
return new KafkaConfiguration(connectionString!, topic!, group!, headers, clientId, autoOffsetReset, ignoreHeaders);
}
}
}

0 comments on commit 9e32b8c

Please sign in to comment.