Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Component #951

Merged
merged 3 commits into from
Jan 12, 2024
Merged

Conversation

g7ed6e
Copy link
Contributor

@g7ed6e g7ed6e commented Nov 19, 2023

Relates to #884

@dotnet-issue-labeler dotnet-issue-labeler bot added the area-integrations Issues pertaining to Aspire Integrations packages label Nov 19, 2023
Comment on lines 32 to 36
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092")
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{port}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do these ports come from? Is it worth leaving a comment about them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments and links. These ports are the defaults of the confluentinc/confluent-local image which are set when the container starts if the env var is not supplied. See https://github.com/confluentinc/kafka-images/blob/master/local/include/etc/confluent/docker/configureDefaults

Comment on lines 11 to 35
return Task.Factory.StartNew(() =>
{
consumer.Subscribe("topic");
while (!stoppingToken.IsCancellationRequested)
{
var result = consumer.Consume(stoppingToken);
logger.LogInformation($"Received message '{result.Message.Value}'.");
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is very strange.. no async? Is it blocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@gfoidl gfoidl Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confluentic has bad API here. It prefetches messages (if available), but it will block if nothing is there. That would be a perfect fit for ValueTask 😉.

Cf. confluentinc/confluent-kafka-dotnet#487

AddKafkaConsumer(builder, $"{DefaultConfigSectionName}:{name}", configureConsumerBuilder, configureConsumerConfig, connectionName: name, serviceKey: name);
}

private static void AddKafkaConsumer<TKey, TValue>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any health checks, telemetry etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I'm thinking to pull logs and metrics from librdkafka using the below delegates that can be registered within ProducerBuilder<TKey, TValue> and ConsumerBuilder<TKey, TValue> SetLogHandler, SetErrorHandler, SetStatisticsHandler. The STATISTICS.json contains metrics.

@mitchdenny
Copy link
Member

Overall, the app model side of this is looking pretty good. I think we need some end to end integration tests for this.

@mitchdenny
Copy link
Member

Once health checks are implemented, please wire-up the Kafa component inside our smoke test (new):

  1. Add container to app host here:
    IntegrationServiceA = AppBuilder.AddProject<Projects.IntegrationServiceA>("integrationservicea")
  2. Add component to the integration test project:

@g7ed6e
Copy link
Contributor Author

g7ed6e commented Nov 24, 2023

@mitchdenny Can you add Confluent.Kafka 2.3.0 and AspNetCore.HealthChecks.Kafka 7.0.0 ?

@davidfowl
Copy link
Member

I think we want a single component for kafka, not a package for the consumer and produce. Also, the component should be Aspire.Confluent.Kafka since that's the name of the underlying client library we're using.

PS: People are out of vacation this week so it's slow 😄

@riverar
Copy link

riverar commented Nov 25, 2023

Side comment: As additional components get developed/added, I wonder if we'll need an ingredients label so developers know which components (and which versions) do and do not have health checks, telemetry, etc.

@mitchdenny
Copy link
Member

I wonder if we'll need an ingredients label so developers know which components (and which versions) do and do not have health checks, telemetry, etc.

We probably shouldn't be merging it if it doesn't have telemetry, health-checks etc. In terms of dependency graph, I think we get that for free from NuGet so I don't think we need to do anything extra there.

@mitchdenny
Copy link
Member

@mitchdenny Can you add Confluent.Kafka 2.3.0 and AspNetCore.HealthChecks.Kafka 7.0.0 ?

They should be in dotnet-public shortly.

@g7ed6e
Copy link
Contributor Author

g7ed6e commented Nov 27, 2023

/azp run

Copy link

Commenter does not have sufficient privileges for PR 951 in repo dotnet/aspire

Copy link
Member

@eerhardt eerhardt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a README and ConfigurationSchema.json files


public static class Counters
{
public const string ReplyQueue = "replyq";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@g7ed6e g7ed6e Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not satisfied either by these names (basically same names as the ones from librdkafka statistics.json), but the naming pattern you linked seems to concern spans not metrics.
As these constants are used to expose specific metrics related to a given producer or consumer instance, I think we should instead follow a metric naming pattern.

edit: @eerhardt would prefixing these constants with messaging.kafka be sufficient ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmolkova you are needed!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the time being I prefixed metrics with messaging.kafka

Copy link
Contributor

@lmolkova lmolkova Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka metrics are in a weird place.
OTel is in the process of adding general messaging metrics into the spec - open-telemetry/semantic-conventions#163 (will be merged soon).

However Kafka as a project decided (AFAIK) to report a set of client metrics (KIP-714) that would be sent by default back to the broker and users would consume them from it. They decided not to follow otel semconv for the time being because of some back-compat reasons and limitations of existing metric impl on the broker side. Since they don't follow otel conventions, they also don't prefix with messaging. namespace.

I have a couple of suggestion

  • let's get an opinion from Kafka folks to understand the bigger picture. @forlack @AndrewJSchofield what are your thoughts on Kafka integration component for .NET Aspire emitting it's own set of OTel-compatible metrics different from what's defined in KIP-714? Also, could you confirm if I remember the decision (above) correctly or maybe something has changed? I envision the same conversation in OTel instrumentation world in Java, Python and other languages.

  • in the short term, WRT metrics added here, I'd recommend aligning them with otel messaging semantic conventions when applicable and following otel general naming conventions otherwise. I'll go through the changes and add my suggestions.

/cc @pyohannes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should align with the semantic conventions as well, even if they are not final. Maybe we can help push towards that goal.

we can't because we just don't get the right data from the kafka client library.

We're going to discuss naming/approach with otel semconv community - https://github.com/open-telemetry/semantic-conventions/issues/578, I should have an update soon.

Copy link
Contributor

@lmolkova lmolkova Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@g7ed6e would you be able to take a quick look at https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/bbfe950ad0ace8123a5e6817fb3767e27a1a2cee/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md and check if any of those metrics apply here and if we can unify the naming? (just discovered they exist).

In any case, metric naming and OTEL community discussion should not block this PR and can be changed in the follow-ups.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • in the short term, WRT metrics added here, I'd recommend aligning them with otel messaging semantic conventions when applicable and following otel general naming conventions otherwise. I'll go through the changes and add my suggestions.

💯 Agreed.

@lmolkova Ideally this could be modeled on top of generic messaging metrics that were recently defined, but it seems to me the overlap is quite small:

Besides that we could validate on what attributes we can converge, I'll try to have a closer look.

Copy link
Contributor Author

@g7ed6e g7ed6e Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pyohannes : I updated and renamed messaging.kafka.message.tx to messaging.publish.messages and
messaging.kafka.message.rx to messaging.receive.messages.

@lmolkova : I had a look to the linked implementation. Currently (in this Aspire PR) we do not intercept messages between user code and kafka client library. Instead we use a librdkafka feature which perdiodically fetch statistics (please notice we only parse the top level properties from the statistics json for now, there should be other metrics we may probably use and map to metrics in the future).
Regarding the interceptor / decorator approach we may implement it, but before I would like to ensure that we want to do this in Aspire component as having a look to the other components they do not hold too much code/logic and only exposes existing clients.

@eerhardt / @davidfowl : What are your thoughts about implementing otel distributed tracing with an internal wrapper around IProducer<TKey, TValue> / IConsumer<TKey, TValue> living in Aspire.Confluent.Kafka.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eerhardt / @davidfowl : What are your thoughts about implementing otel distributed tracing with an internal wrapper around IProducer<TKey, TValue> / IConsumer<TKey, TValue> living in Aspire.Confluent.Kafka.

Yes, this sounds reasonable as a way to drive the conversation forward for the default .NET Kafka library.

@eerhardt I know we don't do this today with rabbit MQ (I think it's harder there because there are more APIs to wrap).

@@ -0,0 +1,42 @@
Microsoft Visual Studio Solution File, Format Version 12.00
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this sample be moved to https://github.com/dotnet/aspire-samples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning a follow up PR to add more metrics and (maybe) implement distributed tracing support by intercepting calls between user code and Confluent.Kafka. Having the sample in the solution highly ease debugging things. I propose to move the sample to the samples repo after that.

@eerhardt
Copy link
Member

@g7ed6e g7ed6e force-pushed the feature/add-kafka-connector branch 2 times, most recently from 950b59d to a68eb0d Compare December 16, 2023 06:55
@mitchdenny
Copy link
Member

@g7ed6e importing now.

@g7ed6e
Copy link
Contributor Author

g7ed6e commented Dec 19, 2023

@g7ed6e importing now.

Thanks @mitchdenny CI now completes successfully

@eerhardt
Copy link
Member

eerhardt commented Jan 9, 2024

I pushed 2 commits to update the ConfigurationSchema.json file to be more complete (sorted and using the generator from #1383). I put them in 2 commits for easy reviewing - first just sorts the existing properties. The second can be diff'd by itself to see what the generator changed.

Unfortunately I hit a bug with the ConfigurationBinder source generator - dotnet/runtime#96652. We aren't able to automate updating the ConfigurationSchema.json file automatically until we have a fix for that issue.

Copy link
Member

@eerhardt eerhardt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty good. Thanks for the contribution. I have a few comments/questions. Once they are addressed I believe we can get this merged.

src/Aspire.Hosting/Kafka/KafkaServerResource.cs Outdated Show resolved Hide resolved
src/Aspire.Hosting/Kafka/KafkaBuilderExtensions.cs Outdated Show resolved Hide resolved

ConsumerConfig config = new();
configSection.GetSection(nameof(KafkaConsumerSettings.Config)).Bind(config);
settings.Config = config;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typical pattern we have is that the "Settings" object and the underlying library's "Options" object (in this case ConsumerConfig) are separate. There isn't a reference between them. Can we remove this property on KafkaConsumerSettings (and the producer as well)?

That way we can use the IOptions pattern for ConsumerConfig, which is what the Redis component does for its ConfigurationOptions object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more at Kafka, there is a bit of a special pattern here in that the ConsumerConfig is separate from the ConsumerBuilder. We don't really have that pattern in the other components - typically there is just 1 object - ConfigurationOptions in Redis, IConnectionFactory in RabbitMQ.

Maybe what we have here is fine for now. It would be awkward to have 3 "configure" callbacks - configureSettings, configureConfig, configureBuilder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eerhardt The reference to the underlying library configuration object in the Aspire's "Settings" object lets customer configure a dedicated serializer / deserialize per pair of generic parameters provided to the underlying confluent XXXBuilder. Using IOptions pattern here would defeat it.
I "reproduced" the pattern I observed in the RabbitMQ implementation.

src/Components/Aspire.Confluent.Kafka/README.md Outdated Show resolved Hide resolved
src/Components/Aspire.Confluent.Kafka/MetricsService.cs Outdated Show resolved Hide resolved
src/Components/Aspire.Confluent.Kafka/MetricsService.cs Outdated Show resolved Hide resolved
@@ -331,6 +331,29 @@ public void EnsureAllRabitMQManifestTypesHaveVersion0Suffix()
Assert.Equal("container.v0", server.GetProperty("type").GetString());
}

[Fact]
public void EnsureAllKafkaManifestTypesHaveVersion0Suffix()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a test to makes sure the connectionString field is being emitted.

@mitchdenny
Copy link
Member

I think this is really close. A question or two from my perspective, but from an app model point of view this looks good.

}
}

[LoggerMessage(LogLevel.Warning, EventId = 42, Message = "Invalid statistics json payload received: `{json}`")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 42?

Copy link
Contributor Author

@g7ed6e g7ed6e Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which value would you suggest ?
edit: Is there an already existing source file containing event id ? I did not find one yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at other usages around dotnet/aspnetcore, it looks like the convention is to start at 1.

src/Components/Aspire.Confluent.Kafka/MetricsService.cs Outdated Show resolved Hide resolved
src/Components/Aspire.Confluent.Kafka/README.md Outdated Show resolved Hide resolved
tests/Aspire.Confluent.Kafka.Tests/MetricsTests.cs Outdated Show resolved Hide resolved
tests/Aspire.Confluent.Kafka.Tests/MetricsTests.cs Outdated Show resolved Hide resolved

ConsumerConfig config = new();
configSection.GetSection(nameof(KafkaConsumerSettings.Config)).Bind(config);
settings.Config = config;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more at Kafka, there is a bit of a special pattern here in that the ConsumerConfig is separate from the ConsumerBuilder. We don't really have that pattern in the other components - typically there is just 1 object - ConfigurationOptions in Redis, IConnectionFactory in RabbitMQ.

Maybe what we have here is fine for now. It would be awkward to have 3 "configure" callbacks - configureSettings, configureConfig, configureBuilder.

@g7ed6e g7ed6e force-pushed the feature/add-kafka-connector branch 2 times, most recently from 33133d7 to 30fba83 Compare January 11, 2024 20:37
apply pr suggestions

apply pr suggestions

apply pr suggestions

Sort ConfigurationSchema.json properties

Update ConfigurationSchema.json using ConfigSchemaGenerator

apply pr suggestions

apply pr suggestions

apply pr suggesstions

apply pr suggesstions

apply pr suggestions

apply pr suggestions

drop kafka sample from this repo

apply pr suggestions
@g7ed6e g7ed6e force-pushed the feature/add-kafka-connector branch from 30fba83 to c540de4 Compare January 11, 2024 20:39
@g7ed6e
Copy link
Contributor Author

g7ed6e commented Jan 11, 2024

@eerhardt I just squashed end rebased until the Orleans support adding.

- Make Kafka Settings Config object non-nullable and not settable from user code.
- Use EventId 1 in Logging
- Log response body in Functional tests
Copy link
Member

@eerhardt eerhardt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks, @g7ed6e, for this contribution and all your great work.

I pushed some review feedback changes. Let me know if you have any issues with the changes.

Unless anyone else has any objections, I will merge this once CI passes.

@eerhardt eerhardt merged commit 93cbd9a into dotnet:main Jan 12, 2024
8 checks passed
@davidfowl
Copy link
Member

Thanks so much @g7ed6e!

@github-actions github-actions bot locked and limited conversation to collaborators Apr 29, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
area-integrations Issues pertaining to Aspire Integrations packages community-contribution Indicates that the PR has been added by a community member
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add Kafka Component