Skip to content

Commit

Permalink
Simplify Shutdown with ConsumerDataflowService/ConsumerDataflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
houseofcat committed Apr 29, 2024
1 parent 6c54286 commit 1b69235
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 17 deletions.
6 changes: 3 additions & 3 deletions src/HouseofCat.Dataflows/BaseDataflow.cs
Expand Up @@ -131,7 +131,7 @@ void WrapAction(TState state)
}

childSpan.End();
state.EndRootSpan(true);
state.EndStateSpan(true);
}

return new ActionBlock<TState>(WrapAction, options);
Expand All @@ -156,7 +156,7 @@ void WrapAction(TState state)
}

childSpan.End();
state.EndRootSpan(true);
state.EndStateSpan(true);
}

return new ActionBlock<TState>(WrapAction, options);
Expand All @@ -181,7 +181,7 @@ async Task WrapActionAsync(TState state)
}

childSpan.End();
state.EndRootSpan(true);
state.EndStateSpan(true);
}

return new ActionBlock<TState>(WrapActionAsync, options);
Expand Down
2 changes: 1 addition & 1 deletion src/HouseofCat.Dataflows/Extensions/WorkStateExtensions.cs
Expand Up @@ -155,7 +155,7 @@ public static void SetSpanAsError(this IWorkState state, TelemetrySpan span, str
attributes: attributes);
}

public static void EndRootSpan(
public static void EndStateSpan(
this IWorkState state,
bool includeErrorWhenFaulted = false)
{
Expand Down
12 changes: 10 additions & 2 deletions src/HouseofCat.RabbitMQ/Services/ConsumerDataflowService.cs
Expand Up @@ -112,8 +112,16 @@ public async Task StartAsync()
await Dataflow.StartAsync();
}

public async Task StopAsync()
/// <summary>
/// Provides mechanism to stop the Dataflow gracefully and optionally shutdown RabbitService.
/// </summary>
/// <param name="immediate"></param>
/// <param name="shutdownService"></param>
/// <returns></returns>
public async Task StopAsync(
bool immediate = false,
bool shutdownService = false)
{
await Dataflow.StopAsync();
await Dataflow.StopAsync(immediate, shutdownService);
}
}
8 changes: 4 additions & 4 deletions tests/OpenTelemetry.Console.Tests/Tests/WorkStateTests.cs
Expand Up @@ -22,7 +22,7 @@ public static void RunRootSpanTest(ILogger logger, string workflowName)
var workstate = new CustomWorkState();

workstate.StartWorkflowSpan(workflowName, spanKind: SpanKind.Internal);
workstate.EndRootSpan();
workstate.EndStateSpan();

logger.LogInformation($"Finished {nameof(RunRootSpanTest)}.");
}
Expand All @@ -38,7 +38,7 @@ public static void RunRootSpanWithChildSpanTest(ILogger logger, string workflowN
{
span.SetStatus(Status.Ok);
}
workstate.EndRootSpan();
workstate.EndStateSpan();

logger.LogInformation($"Finished {nameof(RunRootSpanWithChildSpanTest)}.");
}
Expand All @@ -56,7 +56,7 @@ public static void RunRootSpanWithChildSpanErrorTest(ILogger logger, string work
workstate.SetCurrentSpanAsError("Span had an error!");
}

workstate.EndRootSpan();
workstate.EndStateSpan();

logger.LogInformation($"Finished {nameof(RunRootSpanWithChildSpanTest)}.");
}
Expand All @@ -83,7 +83,7 @@ public static void RunRootSpanWithManyChildSpanFlatLevelTest(ILogger logger, str
}
}

workstate.EndRootSpan();
workstate.EndStateSpan();

logger.LogInformation($"Finished {nameof(RunRootSpanWithChildSpanTest)}.");
}
Expand Down
10 changes: 3 additions & 7 deletions tests/RabbitMQ.ConsumerDataflowService/Program.cs
Expand Up @@ -129,15 +129,11 @@
app.Lifetime.ApplicationStopping.Register(
async () =>
{
logger.LogInformation("RabbitService AutoPublish stopping...");
await rabbitService.Publisher.StopAutoPublishAsync();
logger.LogInformation("ConsumerDataflowService stopping...");
await dataflowService.StopAsync();
await rabbitService.ShutdownAsync(false);
await dataflowService.StopAsync(
immediate: false,
shutdownService: true);
logger.LogInformation("All stopped! Press return to exit...");
});
Expand Down

0 comments on commit 1b69235

Please sign in to comment.