Skip to content

Commit

Permalink
These changes are a quickfix to prevent an infinite loop while dispos…
Browse files Browse the repository at this point in the history
…ing clients. It does not fix the original problem but helps ELSA to continue working. (#3901)

Co-authored-by: Yannick Laubscher <yannick.laubscher@swissteach.ch>
  • Loading branch information
2 people authored and mohdali committed Apr 14, 2023
1 parent 0729c90 commit 35fe0ef
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 6 deletions.
3 changes: 2 additions & 1 deletion src/activities/Elsa.Activities.Kafka/Helpers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public static class Consumer
consumer.Subscribe(topic);
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(cancellationToken);
// This code gets executed sometimes, even though the consumer has been disposed. This results in an "handle is destroyed" exepction
var consumeResult = consumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF) continue;
consumer.Commit(consumeResult);
observer.OnNext(consumeResult.Message);
Expand Down
7 changes: 5 additions & 2 deletions src/activities/Elsa.Activities.Kafka/Services/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ public async Task Dispose()
{
try
{
_consumer.Unsubscribe();
_consumer.Close();
using (_consumer)
{
_consumer.Unsubscribe();
_consumer.Close();
}
}
catch (Exception e)
{
Expand Down
7 changes: 5 additions & 2 deletions src/activities/Elsa.Activities.Kafka/Services/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ private async Task OnMessageReceivedAsync(KafkaMessageEvent ev)

private async Task OnErrorAsync(Exception e)
{
_logger.LogError("Error on consuming");
await _disposeReceiverAction(this, _client);
_logger.LogError($"Error on consuming: {e.Message}");
if (!(e.GetType() == typeof(ObjectDisposedException))) // This line prevents an infinite loop when unpublishing a KafkaMessageReceived Trigger. Probably a SemaphoreSlim problem.
{
await _disposeReceiverAction(this, _client);
}
}

private async Task TriggerWorkflowsAsync(KafkaMessageEvent ev, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ private async Task RemoveAndRespawnWorkerAsync(Worker worker, IClient c, string
private async Task RemoveWorkerAsync(Worker worker)
{
_logger.LogDebug("Disposing worker for {QueueOrTopic}", worker.Topic);
await worker.DisposeAsync();
_workers.Remove(worker);
await worker.DisposeAsync();
}

private IEnumerable<Bookmark> Filter(IEnumerable<Bookmark> triggers)
Expand Down

0 comments on commit 35fe0ef

Please sign in to comment.