From 315c3f5c925099c19ba0c8bd561a17db14a3b480 Mon Sep 17 00:00:00 2001 From: Milind L Date: Mon, 23 Jan 2023 15:48:43 +0530 Subject: [PATCH] [admin] Add consumer group bindings for KIP-88, 222, 518, 396 (partial) (#1981) * [admin] Add ListConsumerGroupOffsets and AlterConsumerGroupOffsets bindings * [admin] Add ListConsumerGroups and DescribeConsumerGroups bindings * Also deprecates the older ListGroups code * [admin] Add KIP-518 changes to List/Describe Consumer Groups Co-authored-by: Emanuele Sabellico --- CHANGELOG.md | 13 + examples/AdminClient/Program.cs | 228 ++++++++++++- .../AlterConsumerGroupOffsetsException.cs | 48 +++ .../Admin/AlterConsumerGroupOffsetsOptions.cs | 36 +++ .../Admin/AlterConsumerGroupOffsetsReport.cs | 52 +++ .../Admin/AlterConsumerGroupOffsetsResult.cs | 45 +++ .../Admin/ConsumerGroupDescription.cs | 62 ++++ .../Admin/ConsumerGroupListing.cs | 48 +++ .../Admin/DescribeConsumerGroupsException.cs | 54 ++++ .../Admin/DescribeConsumerGroupsOptions.cs | 37 +++ .../Admin/DescribeConsumerGroupsReport.cs | 42 +++ .../Admin/DescribeConsumerGroupsResult.cs | 41 +++ .../ListConsumerGroupOffsetsException.cs | 48 +++ .../Admin/ListConsumerGroupOffsetsOptions.cs | 43 +++ .../Admin/ListConsumerGroupOffsetsReport.cs | 52 +++ .../Admin/ListConsumerGroupOffsetsResult.cs | 45 +++ .../Admin/ListConsumerGroupsException.cs | 43 +++ .../Admin/ListConsumerGroupsOptions.cs | 45 +++ .../Admin/ListConsumerGroupsReport.cs | 52 +++ .../Admin/ListConsumerGroupsResult.cs | 41 +++ src/Confluent.Kafka/Admin/MemberAssignment.cs | 31 ++ .../Admin/MemberDescription.cs | 49 +++ src/Confluent.Kafka/AdminClient.cs | 305 ++++++++++++++++++ src/Confluent.Kafka/ConsumerGroupState.cs | 55 ++++ .../ConsumerGroupTopicPartitionOffsets.cs | 59 ++++ .../ConsumerGroupTopicPartitions.cs | 59 ++++ src/Confluent.Kafka/Error.cs | 4 +- src/Confluent.Kafka/Headers.cs | 2 +- src/Confluent.Kafka/IAdminClient.cs | 122 ++++++- src/Confluent.Kafka/IConsumer.cs | 4 +- src/Confluent.Kafka/IProducer.cs | 24 +- src/Confluent.Kafka/Impl/LibRdKafka.cs | 238 +++++++++++++- src/Confluent.Kafka/Impl/Metadata.cs | 1 + .../Impl/NativeMethods/NativeMethods.cs | 134 +++++++- .../NativeMethods/NativeMethods_Alpine.cs | 143 +++++++- .../NativeMethods/NativeMethods_Centos6.cs | 143 +++++++- .../NativeMethods/NativeMethods_Centos7.cs | 143 +++++++- src/Confluent.Kafka/Impl/SafeKafkaHandle.cs | 283 ++++++++++++++++ src/Confluent.Kafka/Node.cs | 47 +++ ...minClient_AlterListConsumerGroupOffsets.cs | 183 +++++++++++ .../AdminClient_ListDescribeConsumerGroups.cs | 175 ++++++++++ .../Tests/Tests.cs | 6 +- 42 files changed, 3241 insertions(+), 44 deletions(-) create mode 100644 src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsException.cs create mode 100644 src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsOptions.cs create mode 100644 src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsReport.cs create mode 100644 src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsResult.cs create mode 100644 src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs create mode 100644 src/Confluent.Kafka/Admin/ConsumerGroupListing.cs create mode 100644 src/Confluent.Kafka/Admin/DescribeConsumerGroupsException.cs create mode 100644 src/Confluent.Kafka/Admin/DescribeConsumerGroupsOptions.cs create mode 100644 src/Confluent.Kafka/Admin/DescribeConsumerGroupsReport.cs create mode 100644 src/Confluent.Kafka/Admin/DescribeConsumerGroupsResult.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsException.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsOptions.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsReport.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsResult.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupsException.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs create mode 100644 src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs create mode 100644 src/Confluent.Kafka/Admin/MemberAssignment.cs create mode 100644 src/Confluent.Kafka/Admin/MemberDescription.cs create mode 100644 src/Confluent.Kafka/ConsumerGroupState.cs create mode 100644 src/Confluent.Kafka/ConsumerGroupTopicPartitionOffsets.cs create mode 100644 src/Confluent.Kafka/ConsumerGroupTopicPartitions.cs create mode 100644 src/Confluent.Kafka/Node.cs create mode 100644 test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_AlterListConsumerGroupOffsets.cs create mode 100644 test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ed917f06..6ac37951b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,19 @@ - Added `LatestCompatibilityStrict` configuration property to JsonSerializerConfig to check the compatibility with latest schema when `UseLatestVersion` is set to true. - Added DeleteConsumerGroupOffset to AdminClient. +- [KIP-222](https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API) + Finish remaining implementation: Add Consumer Group operations to Admin API (`DeleteGroups` is already present). +- [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state) + Allow listing consumer groups per state. +- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484) + Partially implemented: support for AlterConsumerGroupOffsets. +- As result of the above KIPs, added (#1981) + - `ListConsumerGroups` Admin operation. Supports listing by state. + - `DescribeConsumerGroups` Admin operation. Supports multiple groups. + - `ListConsumerGroupOffsets` Admin operation. Currently, only supports + 1 group with multiple partitions. Supports the `requireStable` option. + - `AlterConsumerGroupOffsets` Admin operation. Currently, only supports + 1 group with multiple offsets. ## Fixes diff --git a/examples/AdminClient/Program.cs b/examples/AdminClient/Program.cs index cdbf752ab..8aa7f5fd8 100644 --- a/examples/AdminClient/Program.cs +++ b/examples/AdminClient/Program.cs @@ -295,11 +295,225 @@ static async Task DeleteAclsAsync(string bootstrapServers, string[] commandArgs) } } + static async Task AlterConsumerGroupOffsetsAsync(string bootstrapServers, string[] commandArgs) + { + if (commandArgs.Length < 4) + { + Console.WriteLine("usage: .. alter-consumer-group-offsets ... "); + Environment.ExitCode = 1; + return; + } + + var group = commandArgs[0]; + var tpoes = new List(); + for (int i = 1; i + 2 < commandArgs.Length; i += 3) { + try + { + var topic = commandArgs[i]; + var partition = Int32.Parse(commandArgs[i + 1]); + var offset = Int64.Parse(commandArgs[i + 2]); + tpoes.Add(new TopicPartitionOffset(topic, partition, offset)); + } + catch (Exception e) + { + Console.Error.WriteLine($"An error occurred while parsing arguments: {e}"); + Environment.ExitCode = 1; + return; + } + } + + var input = new List() { new ConsumerGroupTopicPartitionOffsets(group, tpoes) }; + + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) + { + try + { + var results = await adminClient.AlterConsumerGroupOffsetsAsync(input); + Console.WriteLine("Successfully altered offsets:"); + foreach(var groupResult in results) { + Console.WriteLine(groupResult); + } + + } + catch (AlterConsumerGroupOffsetsException e) + { + Console.WriteLine($"An error occurred altering offsets: {(e.Results.Any() ? e.Results[0] : null)}"); + Environment.ExitCode = 1; + return; + } + catch (KafkaException e) + { + Console.WriteLine("An error occurred altering consumer group offsets." + + $" Code: {e.Error.Code}" + + $", Reason: {e.Error.Reason}"); + Environment.ExitCode = 1; + return; + } + } + } + + static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[] commandArgs) + { + if (commandArgs.Length < 1) + { + Console.WriteLine("usage: .. list-consumer-group-offsets [ ... ]"); + Environment.ExitCode = 1; + return; + } + + var group = commandArgs[0]; + var tpes = new List(); + for (int i = 1; i + 1 < commandArgs.Length; i += 2) { + try + { + var topic = commandArgs[i]; + var partition = Int32.Parse(commandArgs[i + 1]); + tpes.Add(new TopicPartition(topic, partition)); + } + catch (Exception e) + { + Console.Error.WriteLine($"An error occurred while parsing arguments: {e}"); + Environment.ExitCode = 1; + return; + } + } + if(!tpes.Any()) + { + // In case the list is empty, request offsets for all the partitions. + tpes = null; + } + + var input = new List() { new ConsumerGroupTopicPartitions(group, tpes) }; + + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) + { + try + { + var result = await adminClient.ListConsumerGroupOffsetsAsync(input); + Console.WriteLine("Successfully listed offsets:"); + foreach(var groupResult in result) { + Console.WriteLine(groupResult); + } + } + catch (ListConsumerGroupOffsetsException e) + { + Console.WriteLine($"An error occurred listing offsets: {(e.Results.Any() ? e.Results[0] : null)}"); + Environment.ExitCode = 1; + return; + } + catch (KafkaException e) + { + Console.WriteLine("An error occurred listing consumer group offsets." + + $" Code: {e.Error.Code}" + + $", Reason: {e.Error.Reason}"); + Environment.ExitCode = 1; + return; + } + } + } + + static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) { + var timeout = TimeSpan.FromSeconds(30); + var statesList = new List(); + try + { + if (commandArgs.Length > 0) + { + timeout = TimeSpan.FromSeconds(Int32.Parse(commandArgs[0])); + } + if (commandArgs.Length > 1) + { + for (int i = 1; i < commandArgs.Length; i++) { + statesList.Add(Enum.Parse(commandArgs[i])); + } + } + } + catch (SystemException) + { + Console.WriteLine("usage: .. list-consumer-groups [ ... ]"); + Environment.ExitCode = 1; + return; + } + + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) + { + try + { + var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() { + RequestTimeout = timeout, + MatchStates = statesList, + }); + Console.WriteLine(result); + } + catch (KafkaException e) + { + Console.WriteLine("An error occurred listing consumer groups." + + $" Code: {e.Error.Code}" + + $", Reason: {e.Error.Reason}"); + Environment.ExitCode = 1; + return; + } + + } + } + + + static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) { + if (commandArgs.Length < 1) + { + Console.WriteLine("usage: .. describe-consumer-groups []"); + Environment.ExitCode = 1; + return; + } + + var groupNames = commandArgs.ToList(); + var timeout = TimeSpan.FromSeconds(30); + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) + { + try + { + var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout }); + foreach (var group in descResult.ConsumerGroupDescriptions) + { + Console.WriteLine($" Group: {group.GroupId} {group.Error}"); + Console.WriteLine($" Broker: {group.Coordinator}"); + Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}"); + Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}"); + Console.WriteLine($" State: {group.State}"); + Console.WriteLine($" Members:"); + foreach (var m in group.Members) + { + Console.WriteLine($" {m.ClientId} {m.ConsumerId} {m.Host}"); + Console.WriteLine($" Assignment:"); + var topicPartitions = ""; + if (m.Assignment.TopicPartitions != null) + { + topicPartitions = String.Join(", ", m.Assignment.TopicPartitions.Select(tp => tp.ToString())); + } + Console.WriteLine($" TopicPartitions: [{topicPartitions}]"); + } + } + } + catch (KafkaException e) + { + Console.WriteLine($"An error occurred describing consumer groups: {e}"); + Environment.ExitCode = 1; + } + } + } + public static async Task Main(string[] args) { if (args.Length < 2) { - Console.WriteLine("usage: .. .."); + Console.WriteLine( + "usage: .. " + String.Join("|", new string[] { + "list-groups", "metadata", "library-version", "create-topic", "create-acls", + "describe-acls", "delete-acls", + "list-consumer-groups", "describe-consumer-groups", + "list-consumer-group-offsets", "alter-consumer-group-offsets" + }) + + " .."); Environment.ExitCode = 1; return; } @@ -331,6 +545,18 @@ public static async Task Main(string[] args) case "delete-acls": await DeleteAclsAsync(bootstrapServers, commandArgs); break; + case "alter-consumer-group-offsets": + await AlterConsumerGroupOffsetsAsync(bootstrapServers, commandArgs); + break; + case "list-consumer-group-offsets": + await ListConsumerGroupOffsetsAsync(bootstrapServers, commandArgs); + break; + case "list-consumer-groups": + await ListConsumerGroupsAsync(bootstrapServers, commandArgs); + break; + case "describe-consumer-groups": + await DescribeConsumerGroupsAsync(bootstrapServers, commandArgs); + break; default: Console.WriteLine($"unknown command: {command}"); break; diff --git a/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsException.cs b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsException.cs new file mode 100644 index 000000000..c662198bd --- /dev/null +++ b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsException.cs @@ -0,0 +1,48 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents an error that occured during altering consumer group offsets. + /// + public class AlterConsumerGroupOffsetsException : KafkaException + { + /// + /// Initializes a new instance of AlterConsumerGroupOffsetsException. + /// + /// + /// The result corresponding to all groups/partitions in the request + /// (whether or not they were in error). At least one of these + /// results will be in error. + /// + public AlterConsumerGroupOffsetsException(List results) + : base(new Error(ErrorCode.Local_Partial, + "An error occurred altering consumer group offsets, check individual result elements")) + { + Results = results; + } + + /// + /// The result corresponding to all groups/partitions in the request + /// (whether or not they were in error). At least one of these + /// results will be in error. + /// + public List Results { get; } + } +} diff --git a/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsOptions.cs b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsOptions.cs new file mode 100644 index 000000000..13e07886a --- /dev/null +++ b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsOptions.cs @@ -0,0 +1,36 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; + +namespace Confluent.Kafka.Admin +{ + /// + /// Options for the "AdminClient.AlterConsumerGroupOffsetsAsync" method. + /// + public class AlterConsumerGroupOffsetsOptions + { + /// + /// The overall request timeout, including broker lookup, request + /// transmission, operation time on broker, and response. If set + /// to null, the default request timeout for the AdminClient will + /// be used. + /// + /// Default: null + /// + public TimeSpan? RequestTimeout { get; set; } + } +} diff --git a/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsReport.cs b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsReport.cs new file mode 100644 index 000000000..b85bbf54b --- /dev/null +++ b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsReport.cs @@ -0,0 +1,52 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// The per-group result for an alter consumer group offsets request + /// including errors. + /// + public class AlterConsumerGroupOffsetsReport + { + /// + /// The groupID. + /// + public string Group { get; set; } + + /// + /// List of topic TopicPartitionOffsetError containing the written offsets, + /// and per-partition errors if any. + /// + public List Partitions { get; set; } + + /// + /// Error, if any, on the group level. + /// + public Error Error { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + var errString = Error.IsError ? Error.ToString() : ""; + return $"{Group} [ {String.Join(", ", Partitions)} ] {errString}"; + } + } +} diff --git a/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsResult.cs b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsResult.cs new file mode 100644 index 000000000..248ac4ee8 --- /dev/null +++ b/src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsResult.cs @@ -0,0 +1,45 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// The per-group result for an alter consumer group offsets request. + /// + public class AlterConsumerGroupOffsetsResult + { + /// + /// The groupID. + /// + public string Group { get; set; } + + /// + /// List of topic TopicPartitionOffsetError containing the written offsets, + /// and per-partition errors if any. + /// + public List Partitions { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + return $"{Group} [ {String.Join(", ", Partitions)} ]"; + } + } +} diff --git a/src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs b/src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs new file mode 100644 index 000000000..ef5014187 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ConsumerGroupDescription.cs @@ -0,0 +1,62 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents a single consumer group's description in the result of a + /// describe consumer group operation. + /// + public class ConsumerGroupDescription + { + /// + /// The groupID. + /// + public string GroupId { get; set; } + + /// + /// Error, if any, of result + /// + public Error Error { get; set; } + + /// + /// Whether the consumer group is simple or not. + /// + public bool IsSimpleConsumerGroup { get; set; } + + /// + /// Partition assignor identifier. + /// + public string PartitionAssignor { get; set; } + + /// + /// Consumer group state. + /// + public ConsumerGroupState State { get; set; } + + /// + /// Consumer group coordinator (broker). + /// + public Node Coordinator { get; set; } + + /// + /// Members list. + /// + public List Members { get; set; } + } +} diff --git a/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs b/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs new file mode 100644 index 000000000..27caece2f --- /dev/null +++ b/src/Confluent.Kafka/Admin/ConsumerGroupListing.cs @@ -0,0 +1,48 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents a single consumer group in the result of a list consumer + /// groups operation. + /// + public class ConsumerGroupListing + { + /// + /// The groupID. + /// + public string GroupId { get; set; } + + /// + /// The state of the consumer group. + /// + public ConsumerGroupState State { get; set; } + + /// + /// Whether the consumer group is simple or not. + /// + public bool IsSimpleConsumerGroup { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() + { + return $"{GroupId}, State = {State}, IsSimpleConsumerGroup = {IsSimpleConsumerGroup}"; + } + } +} diff --git a/src/Confluent.Kafka/Admin/DescribeConsumerGroupsException.cs b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsException.cs new file mode 100644 index 000000000..a722e01c8 --- /dev/null +++ b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsException.cs @@ -0,0 +1,54 @@ + +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Linq; +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents an error that occured during a describe consumer group operation. + /// + public class DescribeConsumerGroupsException : KafkaException + { + /// + /// Initialize a new instance of DescribeConsumerGroupsException. + /// + /// + /// The result corresponding to all groups in the request + /// (whether or not they were in error). At least one of these + /// results will be in error. + /// + public DescribeConsumerGroupsException(DescribeConsumerGroupsReport results) + : base(new Error(ErrorCode.Local_Partial, + "An error occurred describing consumer groups: [" + + String.Join(", ", results.ConsumerGroupDescriptions.Where(r => r.Error.IsError).Select(r => r.GroupId)) + + "]: [" + String.Join(", ", results.ConsumerGroupDescriptions.Where(r => r.Error.IsError).Select(r => r.Error)) + + "].")) + { + this.Results = results; + } + + /// + /// The result corresponding to all groups in the request, + /// (whether or not they were in error). At least one of these + /// results will be in error. + /// + public DescribeConsumerGroupsReport Results { get; } + } +} diff --git a/src/Confluent.Kafka/Admin/DescribeConsumerGroupsOptions.cs b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsOptions.cs new file mode 100644 index 000000000..028501e31 --- /dev/null +++ b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsOptions.cs @@ -0,0 +1,37 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Options for the "AdminClient.DescribeConsumerGroupsOptions" method. + /// + public class DescribeConsumerGroupsOptions + { + /// + /// The overall request timeout, including broker lookup, request + /// transmission, operation time on broker, and response. If set + /// to null, the default request timeout for the AdminClient will + /// be used. + /// + /// Default: null + /// + public TimeSpan? RequestTimeout { get; set; } + } +} diff --git a/src/Confluent.Kafka/Admin/DescribeConsumerGroupsReport.cs b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsReport.cs new file mode 100644 index 000000000..9108c2ded --- /dev/null +++ b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsReport.cs @@ -0,0 +1,42 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents the result of a describe consumer group operation, where one + /// or more of the results has an error. + /// + public class DescribeConsumerGroupsReport + { + /// + /// List of consumer group descriptions. + /// + public List ConsumerGroupDescriptions { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + string res = "Groups:\n"; + foreach (ConsumerGroupDescription cgd in ConsumerGroupDescriptions) { + res += "\t" + cgd.ToString() + "\n"; + } + return res; + } + } +} diff --git a/src/Confluent.Kafka/Admin/DescribeConsumerGroupsResult.cs b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsResult.cs new file mode 100644 index 000000000..13883c10f --- /dev/null +++ b/src/Confluent.Kafka/Admin/DescribeConsumerGroupsResult.cs @@ -0,0 +1,41 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents the result of a describe consumer group operation. + /// + public class DescribeConsumerGroupsResult + { + /// + /// List of consumer group descriptions. + /// + public List ConsumerGroupDescriptions { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + string res = "Groups:\n"; + foreach (ConsumerGroupDescription cgd in ConsumerGroupDescriptions) { + res += "\t" + cgd.ToString() + "\n"; + } + return res; + } + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsException.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsException.cs new file mode 100644 index 000000000..c3849c9e7 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsException.cs @@ -0,0 +1,48 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents an error that occured during listing consumer group offsets. + /// + public class ListConsumerGroupOffsetsException : KafkaException + { + /// + /// Initializes a new instance of ListConsumerGroupOffsetsException. + /// + /// + /// The result corresponding to all groups/partitions in the request + /// (whether or not they were in error). At least one of these + /// results will be in error. + /// + public ListConsumerGroupOffsetsException(List results) + : base(new Error(ErrorCode.Local_Partial, + "An error occurred listing consumer group offsets, check individual result elements")) + { + Results = results; + } + + /// + /// The result corresponding to all groups/partitions in the request + /// (whether or not they were in error). At least one of these + /// results will be in error. + /// + public List Results { get; } + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsOptions.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsOptions.cs new file mode 100644 index 000000000..786ff5d33 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsOptions.cs @@ -0,0 +1,43 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; + +namespace Confluent.Kafka.Admin +{ + /// + /// Options for the "AdminClient.ListConsumerGroupOffsetsAsync" method. + /// + public class ListConsumerGroupOffsetsOptions + { + /// + /// The overall request timeout, including broker lookup, request + /// transmission, operation time on broker, and response. If set + /// to null, the default request timeout for the AdminClient will + /// be used. + /// + /// Default: null + /// + public TimeSpan? RequestTimeout { get; set; } + + /// + /// Decides if the broker should return stable offsets (transaction-committed). + /// + /// Default: false + /// + public bool RequireStableOffsets { get; set; } = false; + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsReport.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsReport.cs new file mode 100644 index 000000000..6547bddb0 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsReport.cs @@ -0,0 +1,52 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// The per-group result for a list consumer group offsets request, and + /// an error. + /// + public class ListConsumerGroupOffsetsReport + { + /// + /// The groupID. + /// + public string Group { get; set; } + + /// + /// List of topic TopicPartitionOffsetError containing the read offsets, + /// and errors if any. + /// + public List Partitions { get; set; } + + /// + /// Error, if any, on a group-level. + /// + public Error Error { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + var errString = Error.IsError ? Error.ToString() : ""; + return $"{Group} [ {String.Join(", ", Partitions)} ] {errString}"; + } + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsResult.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsResult.cs new file mode 100644 index 000000000..3db341e3d --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupOffsetsResult.cs @@ -0,0 +1,45 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// The per-group result for a list consumer group offsets request. + /// + public class ListConsumerGroupOffsetsResult + { + /// + /// The groupID. + /// + public string Group { get; set; } + + /// + /// List of topic TopicPartitionOffsetError containing the read offsets, + /// and errors if any. + /// + public List Partitions { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + return $"{Group} [ {String.Join(", ", Partitions)} ]"; + } + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsException.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsException.cs new file mode 100644 index 000000000..eec576d16 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsException.cs @@ -0,0 +1,43 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents an error that occured during a list consumer group operation. + /// + public class ListConsumerGroupsException : KafkaException + { + /// + /// Initialize a new instance of ListConsumerGroupsException. + /// + /// + /// The result corresponding to all groups in the request + /// + public ListConsumerGroupsException(ListConsumerGroupsReport report) + : base(new Error(ErrorCode.Local_Partial, + "error listing consumer groups")) + { + this.Results = report; + } + + /// + /// The result corresponding to all groups and including all errors. + /// Results.Errors will be non-empty and should be checked. + /// + public ListConsumerGroupsReport Results { get; } + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs new file mode 100644 index 000000000..1bc23b500 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsOptions.cs @@ -0,0 +1,45 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System; +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Options for the "AdminClient.ListConsumerGroupsAsync" method. + /// + public class ListConsumerGroupsOptions + { + /// + /// The overall request timeout, including broker lookup, request + /// transmission, operation time on broker, and response. If set + /// to null, the default request timeout for the AdminClient will + /// be used. + /// + /// Default: null + /// + public TimeSpan? RequestTimeout { get; set; } + + /// + /// An enumerable with the states to query, null to query for all + /// the states. + /// + /// Default: null + /// + public IEnumerable MatchStates { get; set; } = null; + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs new file mode 100644 index 000000000..fd1ea92a7 --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsReport.cs @@ -0,0 +1,52 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents the result of a list consumer group operation with an error. + /// + public class ListConsumerGroupsReport + { + /// + /// List of valid consumer group listings. + /// + public List Valid { get; set; } + + /// + /// List of non-client level errors encountered while listing groups. + /// + public List Errors { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + string res = "Groups:\n"; + foreach (ConsumerGroupListing cgl in Valid) { + res += "\t" + cgl.ToString() + "\n"; + } + if (Errors.Count != 0) { + res += "Errors:\n"; + foreach (Error err in Errors) { + res += "\t" + err.ToString() + "\n"; + } + } + return res; + } + } +} diff --git a/src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs b/src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs new file mode 100644 index 000000000..71c9dd19d --- /dev/null +++ b/src/Confluent.Kafka/Admin/ListConsumerGroupsResult.cs @@ -0,0 +1,41 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// Represents the result of a list consumer group operation. + /// + public class ListConsumerGroupsResult + { + /// + /// List of valid consumer group listings. + /// + public List Valid { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() { + string res = "Groups:\n"; + foreach (ConsumerGroupListing cgl in Valid) { + res += "\t" + cgl.ToString() + "\n"; + } + return res; + } + } +} diff --git a/src/Confluent.Kafka/Admin/MemberAssignment.cs b/src/Confluent.Kafka/Admin/MemberAssignment.cs new file mode 100644 index 000000000..464a9ce96 --- /dev/null +++ b/src/Confluent.Kafka/Admin/MemberAssignment.cs @@ -0,0 +1,31 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; + +namespace Confluent.Kafka.Admin +{ + /// + /// MemberAssignment represents the assignment of a consumer group member. + /// + public class MemberAssignment + { + /// + /// Partitions assigned to current member. + /// + public List TopicPartitions { get; set; } + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/Admin/MemberDescription.cs b/src/Confluent.Kafka/Admin/MemberDescription.cs new file mode 100644 index 000000000..3e8710f75 --- /dev/null +++ b/src/Confluent.Kafka/Admin/MemberDescription.cs @@ -0,0 +1,49 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.Kafka.Admin +{ + /// + /// MemberDescription represents the description of a consumer group member + /// + public class MemberDescription + { + /// + /// Client id. + /// + public string ClientId { get; set; } + + /// + /// Group instance id. + /// + public string GroupInstanceId { get; set; } + + /// + /// Consumer id. + /// + public string ConsumerId { get; set; } + + /// + /// Group member host. + /// + public string Host { get; set; } + + /// + /// Member assignment. + /// + public MemberAssignment Assignment { get; set; } + } +} \ No newline at end of file diff --git a/src/Confluent.Kafka/AdminClient.cs b/src/Confluent.Kafka/AdminClient.cs index f720570c0..bd2d58246 100644 --- a/src/Confluent.Kafka/AdminClient.cs +++ b/src/Confluent.Kafka/AdminClient.cs @@ -224,6 +224,145 @@ private DeleteConsumerGroupOffsetsReport extractDeleteConsumerGroupOffsetsReport }; } + private List extractListConsumerGroupOffsetsResults(IntPtr resultPtr) + { + var resultGroupsPtr = Librdkafka.ListConsumerGroupOffsets_result_groups(resultPtr, out UIntPtr resultCountPtr); + IntPtr[] resultGroupsPtrArr = new IntPtr[(int)resultCountPtr]; + Marshal.Copy(resultGroupsPtr, resultGroupsPtrArr, 0, (int)resultCountPtr); + + return resultGroupsPtrArr.Select(resultGroupPtr => { + + // Construct the TopicPartitionOffsetError list from internal list. + var partitionsPtr = Librdkafka.group_result_partitions(resultGroupPtr); + + return new ListConsumerGroupOffsetsReport { + Group = PtrToStringUTF8(Librdkafka.group_result_name(resultGroupPtr)), + Error = new Error(Librdkafka.group_result_error(resultGroupPtr), false), + Partitions = SafeKafkaHandle.GetTopicPartitionOffsetErrorList(partitionsPtr), + }; + }).ToList(); + } + + private List extractAlterConsumerGroupOffsetsResults(IntPtr resultPtr) + { + var resultGroupsPtr = Librdkafka.AlterConsumerGroupOffsets_result_groups(resultPtr, out UIntPtr resultCountPtr); + IntPtr[] resultGroupsPtrArr = new IntPtr[(int)resultCountPtr]; + Marshal.Copy(resultGroupsPtr, resultGroupsPtrArr, 0, (int)resultCountPtr); + + return resultGroupsPtrArr.Select(resultGroupPtr => { + + // Construct the TopicPartitionOffsetError list from internal list. + var partitionsPtr = Librdkafka.group_result_partitions(resultGroupPtr); + + return new AlterConsumerGroupOffsetsReport { + Group = PtrToStringUTF8(Librdkafka.group_result_name(resultGroupPtr)), + Error = new Error(Librdkafka.group_result_error(resultGroupPtr), false), + Partitions = SafeKafkaHandle.GetTopicPartitionOffsetErrorList(partitionsPtr), + }; + }).ToList(); + } + + private ListConsumerGroupsReport extractListConsumerGroupsResults(IntPtr resultPtr) + { + var result = new ListConsumerGroupsReport() { + Valid = new List(), + Errors = new List(), + }; + + var validResultsPtr = Librdkafka.ListConsumerGroups_result_valid(resultPtr, out UIntPtr resultCountPtr); + if ((int)resultCountPtr != 0) + { + IntPtr[] consumerGroupListingPtrArr = new IntPtr[(int)resultCountPtr]; + Marshal.Copy(validResultsPtr, consumerGroupListingPtrArr, 0, (int)resultCountPtr); + result.Valid = consumerGroupListingPtrArr.Select(cglPtr => { + return new ConsumerGroupListing() { + GroupId = PtrToStringUTF8(Librdkafka.ConsumerGroupListing_group_id(cglPtr)), + IsSimpleConsumerGroup = + (int)Librdkafka.ConsumerGroupListing_is_simple_consumer_group(cglPtr) == 1, + State = Librdkafka.ConsumerGroupListing_state(cglPtr), + }; + }).ToList(); + } + + + var errorsPtr = Librdkafka.ListConsumerGroups_result_errors(resultPtr, out UIntPtr errorCountPtr); + if ((int)errorCountPtr != 0) + { + IntPtr[] errorsPtrArr = new IntPtr[(int)errorCountPtr]; + Marshal.Copy(errorsPtr, errorsPtrArr, 0, (int)errorCountPtr); + result.Errors = errorsPtrArr.Select(errorPtr => new Error(errorPtr)).ToList(); + } + + return result; + } + + private DescribeConsumerGroupsReport extractDescribeConsumerGroupsResults(IntPtr resultPtr) { + var groupsPtr = Librdkafka.DescribeConsumerGroups_result_groups(resultPtr, out UIntPtr groupsCountPtr); + + var result = new DescribeConsumerGroupsReport() { + ConsumerGroupDescriptions = new List() + }; + + if ((int)groupsCountPtr == 0) + return result; + + IntPtr[] groupPtrArr = new IntPtr[(int)groupsCountPtr]; + Marshal.Copy(groupsPtr, groupPtrArr, 0, (int)groupsCountPtr); + + result.ConsumerGroupDescriptions = groupPtrArr.Select(groupPtr => { + + var coordinatorPtr = Librdkafka.ConsumerGroupDescription_coordinator(groupPtr); + var coordinator = new Node() { + Id = (int)Librdkafka.Node_id(coordinatorPtr), + Host = PtrToStringUTF8(Librdkafka.Node_host(coordinatorPtr)), + Port = (int)Librdkafka.Node_port(coordinatorPtr), + }; + + var memberCount = (int)Librdkafka.ConsumerGroupDescription_member_count(groupPtr); + var members = new List(); + for (int midx = 0; midx < memberCount; midx++) + { + var memberPtr = Librdkafka.ConsumerGroupDescription_member(groupPtr, (IntPtr)midx); + var member = new MemberDescription() { + ClientId = + PtrToStringUTF8(Librdkafka.MemberDescription_client_id(memberPtr)), + ConsumerId = + PtrToStringUTF8(Librdkafka.MemberDescription_consumer_id(memberPtr)), + Host = + PtrToStringUTF8(Librdkafka.MemberDescription_host(memberPtr)), + GroupInstanceId = + PtrToStringUTF8(Librdkafka.MemberDescription_group_instance_id(memberPtr)), + }; + var assignmentPtr = Librdkafka.MemberDescription_assignment(memberPtr); + var topicPartitionPtr = Librdkafka.MemberAssignment_topic_partitions(assignmentPtr); + member.Assignment = new MemberAssignment(); + if (topicPartitionPtr != IntPtr.Zero) + { + member.Assignment.TopicPartitions = SafeKafkaHandle.GetTopicPartitionList(topicPartitionPtr); + } + members.Add(member); + } + + var desc = new ConsumerGroupDescription() { + GroupId = + PtrToStringUTF8(Librdkafka.ConsumerGroupDescription_group_id(groupPtr)), + Error = + new Error(Librdkafka.ConsumerGroupDescription_error(groupPtr)), + IsSimpleConsumerGroup = + (int)Librdkafka.ConsumerGroupDescription_is_simple_consumer_group(groupPtr) == 1, + PartitionAssignor = + PtrToStringUTF8(Librdkafka.ConsumerGroupDescription_partition_assignor(groupPtr)), + State = + Librdkafka.ConsumerGroupDescription_state(groupPtr), + Coordinator = coordinator, + Members = members, + }; + return desc; + }).ToList(); + + return result; + } + private Task StartPollTask(CancellationToken ct) => Task.Factory.StartNew(() => { @@ -581,6 +720,117 @@ private Task StartPollTask(CancellationToken ct) } break; + case Librdkafka.EventType.AlterConsumerGroupOffsets_Result: + { + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + break; + } + var results = extractAlterConsumerGroupOffsetsResults(eventPtr); + if (results.Any(r => r.Error.IsError) || results.Any(r => r.Partitions.Any(p => p.Error.IsError))) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new AlterConsumerGroupOffsetsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetResult( + results + .Select(r => new AlterConsumerGroupOffsetsResult() { + Group = r.Group, + Partitions = r.Partitions + }) + .ToList() + )); + } + break; + } + + case Librdkafka.EventType.ListConsumerGroupOffsets_Result: + { + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + break; + } + var results = extractListConsumerGroupOffsetsResults(eventPtr); + if (results.Any(r => r.Error.IsError) || results.Any(r => r.Partitions.Any(p => p.Error.IsError))) + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetException( + new ListConsumerGroupOffsetsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource>)adminClientResult).TrySetResult( + results + .Select(r => new ListConsumerGroupOffsetsResult() { Group = r.Group, Partitions = r.Partitions }) + .ToList() + )); + } + break; + } + + case Librdkafka.EventType.ListConsumerGroups_Result: + { + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + break; + } + var results = extractListConsumerGroupsResults(eventPtr); + if (results.Errors.Count() != 0) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new ListConsumerGroupsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult( + new ListConsumerGroupsResult() { Valid = results.Valid } + )); + } + break; + } + + case Librdkafka.EventType.DescribeConsumerGroups_Result: + { + if (errorCode != ErrorCode.NoError) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new KafkaException(kafkaHandle.CreatePossiblyFatalError(errorCode, errorStr)))); + break; + } + var results = extractDescribeConsumerGroupsResults(eventPtr); + if (results.ConsumerGroupDescriptions.Any(desc => desc.Error.IsError)) + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetException( + new DescribeConsumerGroupsException(results))); + } + else + { + Task.Run(() => + ((TaskCompletionSource)adminClientResult).TrySetResult( + new DescribeConsumerGroupsResult() { ConsumerGroupDescriptions = results.ConsumerGroupDescriptions } + )); + } + break; + } + default: // Should never happen. throw new InvalidOperationException($"Unknown result type: {type}"); @@ -622,6 +872,10 @@ private Task StartPollTask(CancellationToken ct) { Librdkafka.EventType.CreateAcls_Result, typeof(TaskCompletionSource) }, { Librdkafka.EventType.DescribeAcls_Result, typeof(TaskCompletionSource) }, { Librdkafka.EventType.DeleteAcls_Result, typeof(TaskCompletionSource>) }, + { Librdkafka.EventType.AlterConsumerGroupOffsets_Result, typeof(TaskCompletionSource>) }, + { Librdkafka.EventType.ListConsumerGroupOffsets_Result, typeof(TaskCompletionSource>) }, + { Librdkafka.EventType.ListConsumerGroups_Result, typeof(TaskCompletionSource) }, + { Librdkafka.EventType.DescribeConsumerGroups_Result, typeof(TaskCompletionSource) }, }; @@ -957,5 +1211,56 @@ public Task> DeleteAclsAsync(IEnumerable + /// Refer to + /// + public Task> AlterConsumerGroupOffsetsAsync(IEnumerable groupPartitions, AlterConsumerGroupOffsetsOptions options = null) + { + var completionSource = new TaskCompletionSource>(); + var gch = GCHandle.Alloc(completionSource); + Handle.LibrdkafkaHandle.AlterConsumerGroupOffsets( + groupPartitions, options, resultQueue, + GCHandle.ToIntPtr(gch)); + return completionSource.Task; + } + + /// + /// Refer to + /// + public Task> ListConsumerGroupOffsetsAsync(IEnumerable groupPartitions, ListConsumerGroupOffsetsOptions options = null) + { + var completionSource = new TaskCompletionSource>(); + var gch = GCHandle.Alloc(completionSource); + Handle.LibrdkafkaHandle.ListConsumerGroupOffsets( + groupPartitions, options, resultQueue, + GCHandle.ToIntPtr(gch)); + return completionSource.Task; + } + + /// + /// Refer to + /// + public Task ListConsumerGroupsAsync(ListConsumerGroupsOptions options = null) { + var completionSource = new TaskCompletionSource(); + var gch = GCHandle.Alloc(completionSource); + Handle.LibrdkafkaHandle.ListConsumerGroups( + options, resultQueue, + GCHandle.ToIntPtr(gch)); + return completionSource.Task; + } + + + /// + /// Refer to + /// + public Task DescribeConsumerGroupsAsync(IEnumerable groups, DescribeConsumerGroupsOptions options = null) { + var completionSource = new TaskCompletionSource(); + var gch = GCHandle.Alloc(completionSource); + Handle.LibrdkafkaHandle.DescribeConsumerGroups( + groups, options, resultQueue, + GCHandle.ToIntPtr(gch)); + return completionSource.Task; + } } } diff --git a/src/Confluent.Kafka/ConsumerGroupState.cs b/src/Confluent.Kafka/ConsumerGroupState.cs new file mode 100644 index 000000000..b03525b24 --- /dev/null +++ b/src/Confluent.Kafka/ConsumerGroupState.cs @@ -0,0 +1,55 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + + +namespace Confluent.Kafka +{ + /// + /// Enumerates the different consumer group states. + /// + public enum ConsumerGroupState : int + { + /// + /// Unknown + /// + Unknown = 0, + + /// + /// Preparing rebalance + /// + PreparingRebalance = 1, + + /// + /// Completing rebalance + /// + CompletingRebalance = 2, + + /// + /// Stable state + /// + Stable = 3, + + /// + /// Dead + /// + Dead = 4, + + /// + /// Empty + /// + Empty = 5, + }; +} diff --git a/src/Confluent.Kafka/ConsumerGroupTopicPartitionOffsets.cs b/src/Confluent.Kafka/ConsumerGroupTopicPartitionOffsets.cs new file mode 100644 index 000000000..26a483686 --- /dev/null +++ b/src/Confluent.Kafka/ConsumerGroupTopicPartitionOffsets.cs @@ -0,0 +1,59 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; + +namespace Confluent.Kafka +{ + /// + /// Represents a Kafka tuple (consumer group, list of TopicPartitionOffsets). + /// + public class ConsumerGroupTopicPartitionOffsets + { + /// + /// Initializes a new ConsumerGroupTopicPartitionOffsets instance. + /// + /// + /// Kafka consumer group ID. + /// + /// + /// A list of Kafka (topic, partition) tuples. + /// + public ConsumerGroupTopicPartitionOffsets(string group, List topicPartitionOffsets) { + this.TopicPartitionOffsets = topicPartitionOffsets; + this.Group = group; + } + + /// + /// Gets the list of Kafka (topic, partition) tuples. + /// + public List TopicPartitionOffsets { get; } + + /// + /// Gets the Kafka consumer group ID. + /// + public string Group { get; } + + /// + /// Returns a string representation of the ConsumerGroupTopicPartitionOffsets object. + /// + /// + /// A string that represents the ConsumerGroupTopicPartitionOffsets object. + /// + public override string ToString() + => $"{Group} [{TopicPartitionOffsets}]"; + } +} diff --git a/src/Confluent.Kafka/ConsumerGroupTopicPartitions.cs b/src/Confluent.Kafka/ConsumerGroupTopicPartitions.cs new file mode 100644 index 000000000..d5ac0444b --- /dev/null +++ b/src/Confluent.Kafka/ConsumerGroupTopicPartitions.cs @@ -0,0 +1,59 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +using System.Collections.Generic; + +namespace Confluent.Kafka +{ + /// + /// Represents a Kafka tuple (consumer group, list of TopicPartions). + /// + public class ConsumerGroupTopicPartitions + { + /// + /// Initializes a new ConsumerGroupTopicPartitions instance. + /// + /// + /// Kafka consumer group ID. + /// + /// + /// A list of Kafka (topic, partition) tuples. + /// + public ConsumerGroupTopicPartitions(string group, List topicPartitions) { + this.TopicPartitions = topicPartitions; + this.Group = group; + } + + /// + /// Gets the list of Kafka (topic, partition) tuples. + /// + public List TopicPartitions { get; } + + /// + /// Gets the Kafka consumer group ID. + /// + public string Group { get; } + + /// + /// Returns a string representation of the ConsumerGroupTopicPartitions object. + /// + /// + /// A string that represents the ConsumerGroupTopicPartitions object. + /// + public override string ToString() + => $"{Group} [{TopicPartitions}]"; + } +} diff --git a/src/Confluent.Kafka/Error.cs b/src/Confluent.Kafka/Error.cs index 7e9dbb6ea..3000041fd 100644 --- a/src/Confluent.Kafka/Error.cs +++ b/src/Confluent.Kafka/Error.cs @@ -117,8 +117,8 @@ public Error(ErrorCode code) /// /// Whether or not the error is fatal. /// - /// - /// + /// + /// /// public Error(ErrorCode code, string reason, bool isFatal) { diff --git a/src/Confluent.Kafka/Headers.cs b/src/Confluent.Kafka/Headers.cs index d862c8978..aedc910ec 100644 --- a/src/Confluent.Kafka/Headers.cs +++ b/src/Confluent.Kafka/Headers.cs @@ -72,7 +72,7 @@ public void Add(Header header) /// /// The value of the latest element in the collection with the specified key. /// - /// + /// /// The key was not present in the collection. /// public byte[] GetLastBytes(string key) diff --git a/src/Confluent.Kafka/IAdminClient.cs b/src/Confluent.Kafka/IAdminClient.cs index 06103a7bf..8957a5b61 100644 --- a/src/Confluent.Kafka/IAdminClient.cs +++ b/src/Confluent.Kafka/IAdminClient.cs @@ -29,6 +29,8 @@ namespace Confluent.Kafka public interface IAdminClient : IClient { /// + /// DEPRECATED. + /// Superseded by ListConsumerGroups and DescribeConsumerGroups. /// Get information pertaining to all groups in /// the Kafka cluster (blocking) /// @@ -42,6 +44,8 @@ public interface IAdminClient : IClient /// + /// DEPRECATED. + /// Superseded by ListConsumerGroups and DescribeConsumerGroups. /// Get information pertaining to a particular /// group in the Kafka cluster (blocking). /// @@ -237,15 +241,15 @@ public interface IAdminClient : IClient /// /// The options to use when creating the ACL bindings. /// - /// + /// /// Thrown if param is null /// or a is null or /// a is null. /// - /// + /// /// Thrown if the param is empty. /// - /// + /// /// Thrown if any of the constituent results is in /// error. The entire result (which may contain /// constituent results that are not in error) is @@ -273,12 +277,12 @@ public interface IAdminClient : IClient /// /// The options to use when describing ACL bindings. /// - /// + /// /// Thrown if param is null /// or any of and /// is null. /// - /// + /// /// Thrown if the corresponding result is in /// error. The entire result is /// available via the @@ -304,15 +308,15 @@ public interface IAdminClient : IClient /// /// The options to use when describing ACL bindings. /// - /// + /// /// Thrown if param is null /// or any of and /// is null. /// - /// + /// /// Thrown if the param is empty. /// - /// + /// /// Thrown if any of the constituent results is in /// error. The entire result (which may contain /// constituent results that are not in error) is @@ -343,6 +347,108 @@ public interface IAdminClient : IClient /// Task DeleteConsumerGroupOffsetsAsync(String group, IEnumerable partitions, DeleteConsumerGroupOffsetsOptions options = null); + /// + /// Alters consumer group offsets for a number of topic partitions. + /// + /// + /// A IEnumerable of ConsumerGroupTopicPartitionOffsets, each denoting the group and the + /// TopicPartitionOffsets associated with that group to alter the offsets for. + /// The Count of the IEnumerable must exactly be 1. + /// + /// + /// The options to use when altering consumer group offsets. + /// + /// + /// Thrown if the has a count not equal + /// to 1, or if any of the topic names are null. + /// + /// + /// Thrown if any of the constituent results is in + /// error. The entire result (which may contain + /// constituent results that are not in error) is + /// available via the + /// property of the exception. + /// + /// + /// A Task returning a List of . + /// + Task> AlterConsumerGroupOffsetsAsync(IEnumerable groupPartitions, AlterConsumerGroupOffsetsOptions options = null); + + /// + /// Lists consumer group offsets for a number of topic partitions. + /// + /// + /// A IEnumerable of ConsumerGroupTopicPartitions, each denoting the group and the + /// TopicPartitions associated with that group to fetch the offsets for. + /// The Count of the IEnumerable must exactly be 1. + /// + /// + /// The options to use when listing consumer group offsets. + /// + /// + /// Thrown if the has a count not equal + /// to 1, or if any of the topic names are null. + /// + /// + /// Thrown if any of the constituent results is in + /// error. The entire result (which may contain + /// constituent results that are not in error) is + /// available via the + /// property of the exception. + /// + /// + /// A Task returning a List of . + /// + Task> ListConsumerGroupOffsetsAsync(IEnumerable groupPartitions, ListConsumerGroupOffsetsOptions options = null); + + /// + /// Lists consumer groups in the cluster. + /// + /// + /// The options to use while listing consumer groups. + /// + /// + /// Thrown if there is any client-level error. + /// + /// + /// Thrown if any of the constituent results is in + /// error. The entire result (which may contain + /// constituent results that are not in error) is + /// available via the + /// property of the exception. + /// + /// + /// A ListConsumerGroupsResult, which contains a List of + /// and a + /// List of Errors. + /// + Task ListConsumerGroupsAsync(ListConsumerGroupsOptions options = null); + + /// + /// Describes consumer groups in the cluster. + /// + /// + /// The list of groups to describe. This can be set + /// to null to describe all groups. + /// + /// + /// The options to use while describing consumer groups. + /// + /// + /// Thrown if there is any client-level error. + /// + /// + /// Thrown if any of the constituent results is in + /// error. The entire result (which may contain + /// constituent results that are not in error) is + /// available via the + /// property of the exception. + /// + /// + /// A List of . + /// + Task DescribeConsumerGroupsAsync( + IEnumerable groups, DescribeConsumerGroupsOptions options = null); } } diff --git a/src/Confluent.Kafka/IConsumer.cs b/src/Confluent.Kafka/IConsumer.cs index 5105f0845..38c3a5a71 100644 --- a/src/Confluent.Kafka/IConsumer.cs +++ b/src/Confluent.Kafka/IConsumer.cs @@ -71,14 +71,14 @@ public interface IConsumer : IClient /// as a side-effect of calling this method /// (on the same thread). /// - /// + /// /// Thrown /// when a call to this method is unsuccessful /// for any reason (except cancellation by /// user). Inspect the Error property of the /// exception for detailed information. /// - /// + /// /// Thrown on cancellation. /// ConsumeResult Consume(CancellationToken cancellationToken = default(CancellationToken)); diff --git a/src/Confluent.Kafka/IProducer.cs b/src/Confluent.Kafka/IProducer.cs index 4f9602885..cb3501cdc 100644 --- a/src/Confluent.Kafka/IProducer.cs +++ b/src/Confluent.Kafka/IProducer.cs @@ -50,14 +50,14 @@ public interface IProducer : IClient /// report corresponding to the produce request, /// or an exception if an error occured. /// - /// + /// /// Thrown in response to any produce request /// that was unsuccessful for any reason /// (excluding user application logic errors). /// The Error property of the exception provides /// more detailed information. /// - /// + /// /// Thrown in response to invalid argument values. /// Task> ProduceAsync( @@ -93,7 +93,7 @@ public interface IProducer : IClient /// The Error property of the exception provides /// more detailed information. /// - /// + /// /// Thrown in response to invalid argument values. /// Task> ProduceAsync( @@ -119,7 +119,7 @@ public interface IProducer : IClient /// with a delivery report corresponding to the /// produce request (if enabled). /// - /// + /// /// Thrown in response to any error that is known /// immediately (excluding user application logic /// errors), for example ErrorCode.Local_QueueFull. @@ -129,10 +129,10 @@ public interface IProducer : IClient /// the exception / delivery report provides more /// detailed information. /// - /// + /// /// Thrown in response to invalid argument values. /// - /// + /// /// Thrown in response to error conditions that /// reflect an error in the application logic of /// the calling application. @@ -169,10 +169,10 @@ public interface IProducer : IClient /// exception / delivery report provides more detailed /// information. /// - /// + /// /// Thrown in response to invalid argument values. /// - /// + /// /// Thrown in response to error conditions that reflect /// an error in the application logic of the calling /// application. @@ -526,18 +526,18 @@ public interface IProducer : IClient /// /// The maximum length of time this method may block. /// - /// + /// /// Thrown if group metadata is invalid. /// - /// + /// /// Thrown if the application must call AbortTransaction and /// start a new transaction with BeginTransaction if it /// wishes to proceed with transactions. /// - /// + /// /// Thrown if an error occured, and the operation may be retried. /// - /// + /// /// Thrown on all other errors. /// void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout); diff --git a/src/Confluent.Kafka/Impl/LibRdKafka.cs b/src/Confluent.Kafka/Impl/LibRdKafka.cs index 6b18da075..10dc89d36 100644 --- a/src/Confluent.Kafka/Impl/LibRdKafka.cs +++ b/src/Confluent.Kafka/Impl/LibRdKafka.cs @@ -64,7 +64,11 @@ internal enum AdminOp DeleteConsumerGroupOffsets = 8, CreateAcls = 9, DescribeAcls = 10, - DeleteAcls = 11 + DeleteAcls = 11, + ListConsumerGroups = 12, + DescribeConsumerGroups = 13, + ListConsumerGroupOffsets = 14, + AlterConsumerGroupOffsets = 15, } public enum EventType : int @@ -88,6 +92,10 @@ public enum EventType : int CreateAcls_Result = 0x400, DescribeAcls_Result = 0x800, DeleteAcls_Result = 0x1000, + ListConsumerGroups_Result = 0x2000, + DescribeConsumerGroups_Result = 0x4000, + ListConsumerGroupOffsets_Result = 0x8000, + AlterConsumerGroupOffsets_Result = 0x10000, } // Minimum librdkafka version. @@ -266,6 +274,8 @@ static bool SetDelegates(Type nativeMethodsClass) _AdminOptions_set_incremental = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_incremental").CreateDelegate(typeof(Func)); _AdminOptions_set_broker = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_broker").CreateDelegate(typeof(Func)); _AdminOptions_set_opaque = (Action)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_opaque").CreateDelegate(typeof(Action)); + _AdminOptions_set_require_stable_offsets = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_require_stable_offsets").CreateDelegate(typeof(Func)); + _AdminOptions_set_match_consumer_group_states = (Func)methods.Single(m => m.Name == "rd_kafka_AdminOptions_set_match_consumer_group_states").CreateDelegate(typeof(Func)); _NewTopic_new = (Func)methods.Single(m => m.Name == "rd_kafka_NewTopic_new").CreateDelegate(typeof(Func)); _NewTopic_destroy = (Action)methods.Single(m => m.Name == "rd_kafka_NewTopic_destroy").CreateDelegate(typeof(Action)); @@ -357,6 +367,44 @@ static bool SetDelegates(Type nativeMethodsClass) _DeleteAcls_result_response_error = (_DeleteAcls_result_response_error_delegate)methods.Single(m => m.Name == "rd_kafka_DeleteAcls_result_response_error").CreateDelegate(typeof(_DeleteAcls_result_response_error_delegate)); _DeleteAcls_result_response_matching_acls = (_DeleteAcls_result_response_matching_acls_delegate)methods.Single(m => m.Name == "rd_kafka_DeleteAcls_result_response_matching_acls").CreateDelegate(typeof(_DeleteAcls_result_response_matching_acls_delegate)); + _AlterConsumerGroupOffsets_new = (_AlterConsumerGroupOffsets_new_delegate)methods.Single(m => m.Name == "rd_kafka_AlterConsumerGroupOffsets_new").CreateDelegate(typeof(_AlterConsumerGroupOffsets_new_delegate)); + _AlterConsumerGroupOffsets_destroy = (_AlterConsumerGroupOffsets_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_AlterConsumerGroupOffsets_destroy").CreateDelegate(typeof(_AlterConsumerGroupOffsets_destroy_delegate)); + _AlterConsumerGroupOffsets_result_groups = (_AlterConsumerGroupOffsets_result_groups_delegate)methods.Single(m => m.Name == "rd_kafka_AlterConsumerGroupOffsets_result_groups").CreateDelegate(typeof(_AlterConsumerGroupOffsets_result_groups_delegate)); + _AlterConsumerGroupOffsets = (_AlterConsumerGroupOffsets_delegate)methods.Single(m => m.Name == "rd_kafka_AlterConsumerGroupOffsets").CreateDelegate(typeof(_AlterConsumerGroupOffsets_delegate)); + + _ListConsumerGroupOffsets_new = (_ListConsumerGroupOffsets_new_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroupOffsets_new").CreateDelegate(typeof(_ListConsumerGroupOffsets_new_delegate)); + _ListConsumerGroupOffsets_destroy = (_ListConsumerGroupOffsets_destroy_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroupOffsets_destroy").CreateDelegate(typeof(_ListConsumerGroupOffsets_destroy_delegate)); + _ListConsumerGroupOffsets_result_groups = (_ListConsumerGroupOffsets_result_groups_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroupOffsets_result_groups").CreateDelegate(typeof(_ListConsumerGroupOffsets_result_groups_delegate)); + _ListConsumerGroupOffsets = (_ListConsumerGroupOffsets_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroupOffsets").CreateDelegate(typeof(_ListConsumerGroupOffsets_delegate)); + + _ListConsumerGroups = (_ListConsumerGroups_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups").CreateDelegate(typeof (_ListConsumerGroups_delegate)); + _ConsumerGroupListing_group_id = (_ConsumerGroupListing_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_group_id").CreateDelegate(typeof (_ConsumerGroupListing_group_id_delegate)); + _ConsumerGroupListing_is_simple_consumer_group = (_ConsumerGroupListing_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupListing_is_simple_consumer_group_delegate)); + _ConsumerGroupListing_state = (_ConsumerGroupListing_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupListing_state").CreateDelegate(typeof (_ConsumerGroupListing_state_delegate)); + _ListConsumerGroups_result_valid = (_ListConsumerGroups_result_valid_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_valid").CreateDelegate(typeof (_ListConsumerGroups_result_valid_delegate)); + _ListConsumerGroups_result_errors = (_ListConsumerGroups_result_errors_delegate)methods.Single(m => m.Name == "rd_kafka_ListConsumerGroups_result_errors").CreateDelegate(typeof (_ListConsumerGroups_result_errors_delegate)); + + _DescribeConsumerGroups = (_DescribeConsumerGroups_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeConsumerGroups").CreateDelegate(typeof (_DescribeConsumerGroups_delegate)); + _DescribeConsumerGroups_result_groups = (_DescribeConsumerGroups_result_groups_delegate)methods.Single(m => m.Name == "rd_kafka_DescribeConsumerGroups_result_groups").CreateDelegate(typeof (_DescribeConsumerGroups_result_groups_delegate)); + _ConsumerGroupDescription_group_id = (_ConsumerGroupDescription_group_id_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_group_id").CreateDelegate(typeof (_ConsumerGroupDescription_group_id_delegate)); + _ConsumerGroupDescription_error = (_ConsumerGroupDescription_error_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_error").CreateDelegate(typeof (_ConsumerGroupDescription_error_delegate)); + _ConsumerGroupDescription_is_simple_consumer_group = (_ConsumerGroupDescription_is_simple_consumer_group_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_is_simple_consumer_group").CreateDelegate(typeof (_ConsumerGroupDescription_is_simple_consumer_group_delegate)); + _ConsumerGroupDescription_partition_assignor = (_ConsumerGroupDescription_partition_assignor_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_partition_assignor").CreateDelegate(typeof (_ConsumerGroupDescription_partition_assignor_delegate)); + _ConsumerGroupDescription_state = (_ConsumerGroupDescription_state_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_state").CreateDelegate(typeof (_ConsumerGroupDescription_state_delegate)); + _ConsumerGroupDescription_coordinator = (_ConsumerGroupDescription_coordinator_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_coordinator").CreateDelegate(typeof (_ConsumerGroupDescription_coordinator_delegate)); + _ConsumerGroupDescription_member_count = (_ConsumerGroupDescription_member_count_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member_count").CreateDelegate(typeof (_ConsumerGroupDescription_member_count_delegate)); + _ConsumerGroupDescription_member = (_ConsumerGroupDescription_member_delegate)methods.Single(m => m.Name == "rd_kafka_ConsumerGroupDescription_member").CreateDelegate(typeof (_ConsumerGroupDescription_member_delegate)); + _MemberDescription_client_id = (_MemberDescription_client_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_client_id").CreateDelegate(typeof (_MemberDescription_client_id_delegate)); + _MemberDescription_group_instance_id = (_MemberDescription_group_instance_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_group_instance_id").CreateDelegate(typeof (_MemberDescription_group_instance_id_delegate)); + _MemberDescription_consumer_id = (_MemberDescription_consumer_id_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_consumer_id").CreateDelegate(typeof (_MemberDescription_consumer_id_delegate)); + _MemberDescription_host = (_MemberDescription_host_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_host").CreateDelegate(typeof (_MemberDescription_host_delegate)); + _MemberDescription_assignment = (_MemberDescription_assignment_delegate)methods.Single(m => m.Name == "rd_kafka_MemberDescription_assignment").CreateDelegate(typeof (_MemberDescription_assignment_delegate)); + _MemberAssignment_partitions = (_MemberAssignment_partitions_delegate)methods.Single(m => m.Name == "rd_kafka_MemberAssignment_partitions").CreateDelegate(typeof (_MemberAssignment_partitions_delegate)); + _Node_id = (_Node_id_delegate)methods.Single(m => m.Name == "rd_kafka_Node_id").CreateDelegate(typeof (_Node_id_delegate)); + _Node_host = (_Node_host_delegate)methods.Single(m => m.Name == "rd_kafka_Node_host").CreateDelegate(typeof (_Node_host_delegate)); + _Node_port = (_Node_port_delegate)methods.Single(m => m.Name == "rd_kafka_Node_port").CreateDelegate(typeof (_Node_port_delegate)); + + _topic_result_error = (Func)methods.Single(m => m.Name == "rd_kafka_topic_result_error").CreateDelegate(typeof(Func)); _topic_result_error_string = (Func)methods.Single(m => m.Name == "rd_kafka_topic_result_error_string").CreateDelegate(typeof(Func)); _topic_result_name = (Func)methods.Single(m => m.Name == "rd_kafka_topic_result_name").CreateDelegate(typeof(Func)); @@ -1150,6 +1198,14 @@ internal static IntPtr brokers_add(IntPtr rk, string brokerlist) IntPtr options, IntPtr opaque) => _AdminOptions_set_opaque(options, opaque); + private static Func _AdminOptions_set_require_stable_offsets; + internal static IntPtr AdminOptions_set_require_stable_offsets( + IntPtr options, + IntPtr true_or_false) => _AdminOptions_set_require_stable_offsets(options, true_or_false); + + private static Func _AdminOptions_set_match_consumer_group_states; + internal static IntPtr AdminOptions_set_match_consumer_group_states(IntPtr options, ConsumerGroupState[] states, UIntPtr statesCnt) + => _AdminOptions_set_match_consumer_group_states(options, states, statesCnt); private static Func _NewTopic_new; internal static IntPtr NewTopic_new( @@ -1594,6 +1650,186 @@ IntPtr resultResponse out UIntPtr matchingAclsCntp ) => _DeleteAcls_result_response_matching_acls(resultResponse, out matchingAclsCntp); + + private delegate IntPtr _AlterConsumerGroupOffsets_new_delegate(string group, IntPtr partitions); + private static _AlterConsumerGroupOffsets_new_delegate _AlterConsumerGroupOffsets_new; + internal static IntPtr AlterConsumerGroupOffsets_new(string group, IntPtr partitions) + => _AlterConsumerGroupOffsets_new(group, partitions); + + private delegate void _AlterConsumerGroupOffsets_destroy_delegate(IntPtr groupPartitions); + private static _AlterConsumerGroupOffsets_destroy_delegate _AlterConsumerGroupOffsets_destroy; + internal static void AlterConsumerGroupOffsets_destroy(IntPtr groupPartitions) + => _AlterConsumerGroupOffsets_destroy(groupPartitions); + + private delegate void _AlterConsumerGroupOffsets_delegate(IntPtr handle, IntPtr[] alterGroupsPartitions, UIntPtr alterGroupsPartitionsSize, IntPtr optionsPtr, IntPtr resultQueuePtr); + private static _AlterConsumerGroupOffsets_delegate _AlterConsumerGroupOffsets; + internal static void AlterConsumerGroupOffsets( + IntPtr handle, + IntPtr[] alterGroupsPartitions, + UIntPtr alterGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr) => _AlterConsumerGroupOffsets(handle, alterGroupsPartitions, alterGroupsPartitionsSize, optionsPtr, resultQueuePtr); + + private delegate IntPtr _AlterConsumerGroupOffsets_result_groups_delegate(IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + private static _AlterConsumerGroupOffsets_result_groups_delegate _AlterConsumerGroupOffsets_result_groups; + internal static IntPtr AlterConsumerGroupOffsets_result_groups( + IntPtr resultResponse, + out UIntPtr groupsTopicPartitionsCount + ) => _AlterConsumerGroupOffsets_result_groups(resultResponse, out groupsTopicPartitionsCount); + + + private delegate IntPtr _ListConsumerGroupOffsets_new_delegate(string group, IntPtr partitions); + private static _ListConsumerGroupOffsets_new_delegate _ListConsumerGroupOffsets_new; + internal static IntPtr ListConsumerGroupOffsets_new(string group, IntPtr partitions) + => _ListConsumerGroupOffsets_new(group, partitions); + + private delegate void _ListConsumerGroupOffsets_destroy_delegate(IntPtr groupPartitions); + private static _ListConsumerGroupOffsets_destroy_delegate _ListConsumerGroupOffsets_destroy; + internal static void ListConsumerGroupOffsets_destroy(IntPtr groupPartitions) + => _ListConsumerGroupOffsets_destroy(groupPartitions); + + private delegate void _ListConsumerGroupOffsets_delegate( + IntPtr handle, IntPtr[] listGroupsPartitions, UIntPtr listGroupsPartitionsSize, IntPtr optionsPtr, IntPtr resultQueuePtr); + private static _ListConsumerGroupOffsets_delegate _ListConsumerGroupOffsets; + internal static void ListConsumerGroupOffsets( + IntPtr handle, + IntPtr[] listGroupsPartitions, + UIntPtr listGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr) => _ListConsumerGroupOffsets(handle, listGroupsPartitions, listGroupsPartitionsSize, optionsPtr, resultQueuePtr); + + private delegate IntPtr _ListConsumerGroupOffsets_result_groups_delegate(IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + private static _ListConsumerGroupOffsets_result_groups_delegate _ListConsumerGroupOffsets_result_groups; + internal static IntPtr ListConsumerGroupOffsets_result_groups( + IntPtr resultResponse, + out UIntPtr groupsTopicPartitionsCount + ) => _ListConsumerGroupOffsets_result_groups(resultResponse, out groupsTopicPartitionsCount); + + private delegate void _ListConsumerGroups_delegate(IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr); + private static _ListConsumerGroups_delegate _ListConsumerGroups; + internal static void ListConsumerGroups(IntPtr handle, IntPtr optionsPtr, IntPtr resultQueuePtr) + => _ListConsumerGroups(handle, optionsPtr, resultQueuePtr); + + private delegate IntPtr _ConsumerGroupListing_group_id_delegate(IntPtr grplist); + private static _ConsumerGroupListing_group_id_delegate _ConsumerGroupListing_group_id; + internal static IntPtr ConsumerGroupListing_group_id(IntPtr grplist) + => _ConsumerGroupListing_group_id(grplist); + + private delegate IntPtr _ConsumerGroupListing_is_simple_consumer_group_delegate(IntPtr grplist); + private static _ConsumerGroupListing_is_simple_consumer_group_delegate _ConsumerGroupListing_is_simple_consumer_group; + internal static IntPtr ConsumerGroupListing_is_simple_consumer_group(IntPtr grplist) + => _ConsumerGroupListing_is_simple_consumer_group(grplist); + + private delegate ConsumerGroupState _ConsumerGroupListing_state_delegate(IntPtr grplist); + private static _ConsumerGroupListing_state_delegate _ConsumerGroupListing_state; + internal static ConsumerGroupState ConsumerGroupListing_state(IntPtr grplist) + => _ConsumerGroupListing_state(grplist); + + private delegate IntPtr _ListConsumerGroups_result_valid_delegate(IntPtr result, out UIntPtr cntp); + private static _ListConsumerGroups_result_valid_delegate _ListConsumerGroups_result_valid; + internal static IntPtr ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp) + => _ListConsumerGroups_result_valid(result, out cntp); + + private delegate IntPtr _ListConsumerGroups_result_errors_delegate(IntPtr result, out UIntPtr cntp); + private static _ListConsumerGroups_result_errors_delegate _ListConsumerGroups_result_errors; + internal static IntPtr ListConsumerGroups_result_errors(IntPtr result, out UIntPtr cntp) + => _ListConsumerGroups_result_errors(result, out cntp); + + private delegate void _DescribeConsumerGroups_delegate( + IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] groups, UIntPtr groupsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr); + private static _DescribeConsumerGroups_delegate _DescribeConsumerGroups; + internal static void DescribeConsumerGroups( + IntPtr handle, [MarshalAs(UnmanagedType.LPArray)] string[] groups, UIntPtr groupsCnt, IntPtr optionsPtr, IntPtr resultQueuePtr) + => _DescribeConsumerGroups(handle, groups, groupsCnt, optionsPtr, resultQueuePtr); + + private delegate IntPtr _DescribeConsumerGroups_result_groups_delegate(IntPtr result, out UIntPtr cntp); + private static _DescribeConsumerGroups_result_groups_delegate _DescribeConsumerGroups_result_groups; + internal static IntPtr DescribeConsumerGroups_result_groups(IntPtr result, out UIntPtr cntp) + => _DescribeConsumerGroups_result_groups(result, out cntp); + + private delegate IntPtr _ConsumerGroupDescription_group_id_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_group_id_delegate _ConsumerGroupDescription_group_id; + internal static IntPtr ConsumerGroupDescription_group_id(IntPtr grpdesc) + => _ConsumerGroupDescription_group_id(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_error_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_error_delegate _ConsumerGroupDescription_error; + internal static IntPtr ConsumerGroupDescription_error(IntPtr grpdesc) + => _ConsumerGroupDescription_error(grpdesc); + + private delegate int _ConsumerGroupDescription_is_simple_consumer_group_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_is_simple_consumer_group_delegate _ConsumerGroupDescription_is_simple_consumer_group; + internal static int ConsumerGroupDescription_is_simple_consumer_group(IntPtr grpdesc) + => _ConsumerGroupDescription_is_simple_consumer_group(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_partition_assignor_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_partition_assignor_delegate _ConsumerGroupDescription_partition_assignor; + internal static IntPtr ConsumerGroupDescription_partition_assignor(IntPtr grpdesc) + => _ConsumerGroupDescription_partition_assignor(grpdesc); + + private delegate ConsumerGroupState _ConsumerGroupDescription_state_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_state_delegate _ConsumerGroupDescription_state; + internal static ConsumerGroupState ConsumerGroupDescription_state(IntPtr grpdesc) { + return _ConsumerGroupDescription_state(grpdesc); + } + + private delegate IntPtr _ConsumerGroupDescription_coordinator_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_coordinator_delegate _ConsumerGroupDescription_coordinator; + internal static IntPtr ConsumerGroupDescription_coordinator(IntPtr grpdesc) + => _ConsumerGroupDescription_coordinator(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_member_count_delegate(IntPtr grpdesc); + private static _ConsumerGroupDescription_member_count_delegate _ConsumerGroupDescription_member_count; + internal static IntPtr ConsumerGroupDescription_member_count(IntPtr grpdesc) + => _ConsumerGroupDescription_member_count(grpdesc); + + private delegate IntPtr _ConsumerGroupDescription_member_delegate(IntPtr grpdesc, IntPtr idx); + private static _ConsumerGroupDescription_member_delegate _ConsumerGroupDescription_member; + internal static IntPtr ConsumerGroupDescription_member(IntPtr grpdesc, IntPtr idx) + => _ConsumerGroupDescription_member(grpdesc, idx); + + private delegate IntPtr _MemberDescription_client_id_delegate(IntPtr member); + private static _MemberDescription_client_id_delegate _MemberDescription_client_id; + internal static IntPtr MemberDescription_client_id(IntPtr member) + => _MemberDescription_client_id(member); + + private delegate IntPtr _MemberDescription_group_instance_id_delegate(IntPtr member); + private static _MemberDescription_group_instance_id_delegate _MemberDescription_group_instance_id; + internal static IntPtr MemberDescription_group_instance_id(IntPtr member) + => _MemberDescription_group_instance_id(member); + + private delegate IntPtr _MemberDescription_consumer_id_delegate(IntPtr member); + private static _MemberDescription_consumer_id_delegate _MemberDescription_consumer_id; + internal static IntPtr MemberDescription_consumer_id(IntPtr member) + => _MemberDescription_consumer_id(member); + + private delegate IntPtr _MemberDescription_host_delegate(IntPtr member); + private static _MemberDescription_host_delegate _MemberDescription_host; + internal static IntPtr MemberDescription_host(IntPtr member) + => _MemberDescription_host(member); + + private delegate IntPtr _MemberDescription_assignment_delegate(IntPtr member); + private static _MemberDescription_assignment_delegate _MemberDescription_assignment; + internal static IntPtr MemberDescription_assignment(IntPtr member) + => _MemberDescription_assignment(member); + + private delegate IntPtr _MemberAssignment_partitions_delegate(IntPtr assignment); + private static _MemberAssignment_partitions_delegate _MemberAssignment_partitions; + internal static IntPtr MemberAssignment_topic_partitions(IntPtr assignment) + => _MemberAssignment_partitions(assignment); + + private delegate IntPtr _Node_id_delegate(IntPtr node); + private static _Node_id_delegate _Node_id; + internal static IntPtr Node_id(IntPtr node) => _Node_id(node); + + private delegate IntPtr _Node_host_delegate(IntPtr node); + private static _Node_host_delegate _Node_host; + internal static IntPtr Node_host(IntPtr node) => _Node_host(node); + + private delegate IntPtr _Node_port_delegate(IntPtr node); + private static _Node_port_delegate _Node_port; + internal static IntPtr Node_port(IntPtr node) => _Node_port(node); + private static Func _topic_result_error; internal static ErrorCode topic_result_error(IntPtr topicres) => _topic_result_error(topicres); diff --git a/src/Confluent.Kafka/Impl/Metadata.cs b/src/Confluent.Kafka/Impl/Metadata.cs index 40958825a..c045ed083 100644 --- a/src/Confluent.Kafka/Impl/Metadata.cs +++ b/src/Confluent.Kafka/Impl/Metadata.cs @@ -69,6 +69,7 @@ struct rd_kafka_group_member_info internal IntPtr member_metadata_size; internal IntPtr member_assignment; internal IntPtr member_assignment_size; + internal IntPtr member_assignment_toppars; }; [StructLayout(LayoutKind.Sequential)] diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs index aa655b653..1a4c2dfd7 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods.cs @@ -30,7 +30,7 @@ namespace Confluent.Kafka.Impl.NativeMethods /// This copy/pasting is required because DllName must be const. /// TODO: generate the NativeMethods classes at runtime (compile C# code) rather /// than copy/paste. - /// + /// /// Alternatively, we could have used dlopen to load the native library, but to /// do that we need to know the absolute path of the native libraries because the /// dlopen call does not know .NET runtime library storage conventions. Unfortunately @@ -504,6 +504,17 @@ internal class NativeMethods IntPtr opaque); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_require_stable_offsets( + IntPtr options, + IntPtr true_or_false); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states( + IntPtr options, + ConsumerGroupState[] states, + UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -899,6 +910,127 @@ internal class NativeMethods /* size_t * */ out UIntPtr cntp ); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets( + IntPtr handle, + IntPtr[] listGroupsPartitions, + UIntPtr listGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets( + IntPtr handle, + IntPtr[] alterGroupsPartitions, + UIntPtr alterGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroups( + IntPtr handle, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_group_id(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_is_simple_consumer_group(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_errors(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_DescribeConsumerGroups( + IntPtr handle, + [MarshalAs(UnmanagedType.LPArray)] string[] groups, + UIntPtr groupsCnt, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_DescribeConsumerGroups_result_groups(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_group_id(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_error(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern int rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_partition_assignor(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member_count(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member(IntPtr grpdesc, IntPtr idx); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_client_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_group_instance_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_consumer_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_host(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_assignment(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_id(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_host(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_port(IntPtr node); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ErrorCode rd_kafka_topic_result_error(IntPtr topicres); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs index 943842a3f..13ee62f48 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Alpine.cs @@ -27,13 +27,13 @@ namespace Confluent.Kafka.Impl.NativeMethods /// for the DllName const. /// /// - /// This copy/pasting is required because DllName must be const. + /// This copy/pasting is required because DllName must be const. /// TODO: generate the NativeMethods classes at runtime (compile C# code) rather /// than copy/paste. - /// - /// Alternatively, we could have used dlopen to load the native library, but to + /// + /// Alternatively, we could have used dlopen to load the native library, but to /// do that we need to know the absolute path of the native libraries because the - /// dlopen call does not know .NET runtime library storage conventions. Unfortunately + /// dlopen call does not know .NET runtime library storage conventions. Unfortunately /// these are relatively complex, so we prefer to go with the copy/paste solution /// which is relatively simple. /// @@ -508,6 +508,17 @@ internal class NativeMethods_Alpine IntPtr opaque); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_require_stable_offsets( + IntPtr options, + IntPtr true_or_false); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states( + IntPtr options, + ConsumerGroupState[] states, + UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -577,7 +588,8 @@ internal class NativeMethods_Alpine [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern /* rd_kafka_DeleteGroup_t * */ IntPtr rd_kafka_DeleteGroup_new( - [MarshalAs(UnmanagedType.LPStr)] string group); + [MarshalAs(UnmanagedType.LPStr)] string group + ); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_DeleteGroup_destroy( @@ -902,6 +914,127 @@ internal class NativeMethods_Alpine /* size_t * */ out UIntPtr cntp ); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets( + IntPtr handle, + IntPtr[] listGroupsPartitions, + UIntPtr listGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets( + IntPtr handle, + IntPtr[] alterGroupsPartitions, + UIntPtr alterGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroups( + IntPtr handle, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_group_id(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_is_simple_consumer_group(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_errors(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_DescribeConsumerGroups( + IntPtr handle, + [MarshalAs(UnmanagedType.LPArray)] string[] groups, + UIntPtr groupsCnt, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_DescribeConsumerGroups_result_groups(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_group_id(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_error(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern int rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_partition_assignor(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member_count(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member(IntPtr grpdesc, IntPtr idx); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_client_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_group_instance_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_consumer_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_host(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_assignment(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_id(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_host(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_port(IntPtr node); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ErrorCode rd_kafka_topic_result_error(IntPtr topicres); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos6.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos6.cs index 2acc384f2..c8eb17f81 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos6.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos6.cs @@ -27,13 +27,13 @@ namespace Confluent.Kafka.Impl.NativeMethods /// for the DllName const. /// /// - /// This copy/pasting is required because DllName must be const. + /// This copy/pasting is required because DllName must be const. /// TODO: generate the NativeMethods classes at runtime (compile C# code) rather /// than copy/paste. - /// - /// Alternatively, we could have used dlopen to load the native library, but to + /// + /// Alternatively, we could have used dlopen to load the native library, but to /// do that we need to know the absolute path of the native libraries because the - /// dlopen call does not know .NET runtime library storage conventions. Unfortunately + /// dlopen call does not know .NET runtime library storage conventions. Unfortunately /// these are relatively complex, so we prefer to go with the copy/paste solution /// which is relatively simple. /// @@ -508,6 +508,17 @@ internal class NativeMethods_Centos6 IntPtr opaque); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_require_stable_offsets( + IntPtr options, + IntPtr true_or_false); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states( + IntPtr options, + ConsumerGroupState[] states, + UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -577,7 +588,8 @@ internal class NativeMethods_Centos6 [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern /* rd_kafka_DeleteGroup_t * */ IntPtr rd_kafka_DeleteGroup_new( - [MarshalAs(UnmanagedType.LPStr)] string group); + [MarshalAs(UnmanagedType.LPStr)] string group + ); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_DeleteGroup_destroy( @@ -902,6 +914,127 @@ internal class NativeMethods_Centos6 /* size_t * */ out UIntPtr cntp ); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets( + IntPtr handle, + IntPtr[] listGroupsPartitions, + UIntPtr listGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets( + IntPtr handle, + IntPtr[] alterGroupsPartitions, + UIntPtr alterGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroups( + IntPtr handle, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_group_id(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_is_simple_consumer_group(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_errors(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_DescribeConsumerGroups( + IntPtr handle, + [MarshalAs(UnmanagedType.LPArray)] string[] groups, + UIntPtr groupsCnt, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_DescribeConsumerGroups_result_groups(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_group_id(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_error(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern int rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_partition_assignor(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member_count(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member(IntPtr grpdesc, IntPtr idx); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_client_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_group_instance_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_consumer_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_host(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_assignment(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_id(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_host(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_port(IntPtr node); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ErrorCode rd_kafka_topic_result_error(IntPtr topicres); diff --git a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos7.cs b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos7.cs index c3cb92040..87840655a 100644 --- a/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos7.cs +++ b/src/Confluent.Kafka/Impl/NativeMethods/NativeMethods_Centos7.cs @@ -27,13 +27,13 @@ namespace Confluent.Kafka.Impl.NativeMethods /// for the DllName const. /// /// - /// This copy/pasting is required because DllName must be const. + /// This copy/pasting is required because DllName must be const. /// TODO: generate the NativeMethods classes at runtime (compile C# code) rather /// than copy/paste. - /// - /// Alternatively, we could have used dlopen to load the native library, but to + /// + /// Alternatively, we could have used dlopen to load the native library, but to /// do that we need to know the absolute path of the native libraries because the - /// dlopen call does not know .NET runtime library storage conventions. Unfortunately + /// dlopen call does not know .NET runtime library storage conventions. Unfortunately /// these are relatively complex, so we prefer to go with the copy/paste solution /// which is relatively simple. /// @@ -508,6 +508,17 @@ internal class NativeMethods_Centos7 IntPtr opaque); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_require_stable_offsets( + IntPtr options, + IntPtr true_or_false); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AdminOptions_set_match_consumer_group_states( + IntPtr options, + ConsumerGroupState[] states, + UIntPtr statesCnt); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern IntPtr rd_kafka_NewTopic_new( [MarshalAs(UnmanagedType.LPStr)] string topic, @@ -577,7 +588,8 @@ internal class NativeMethods_Centos7 [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern /* rd_kafka_DeleteGroup_t * */ IntPtr rd_kafka_DeleteGroup_new( - [MarshalAs(UnmanagedType.LPStr)] string group); + [MarshalAs(UnmanagedType.LPStr)] string group + ); [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern void rd_kafka_DeleteGroup_destroy( @@ -902,6 +914,127 @@ internal class NativeMethods_Centos7 /* size_t * */ out UIntPtr cntp ); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroupOffsets( + IntPtr handle, + IntPtr[] listGroupsPartitions, + UIntPtr listGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_new( + [MarshalAs(UnmanagedType.LPStr)] string group, IntPtr partitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets_destroy(IntPtr groupPartitions); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_AlterConsumerGroupOffsets_result_groups( + IntPtr resultResponse, out UIntPtr groupsTopicPartitionsCount); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_AlterConsumerGroupOffsets( + IntPtr handle, + IntPtr[] alterGroupsPartitions, + UIntPtr alterGroupsPartitionsSize, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_ListConsumerGroups( + IntPtr handle, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_group_id(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupListing_is_simple_consumer_group(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupListing_state(IntPtr grplist); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_valid(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ListConsumerGroups_result_errors(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern void rd_kafka_DescribeConsumerGroups( + IntPtr handle, + [MarshalAs(UnmanagedType.LPArray)] string[] groups, + UIntPtr groupsCnt, + IntPtr optionsPtr, + IntPtr resultQueuePtr); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_DescribeConsumerGroups_result_groups(IntPtr result, out UIntPtr cntp); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_group_id(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_error(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern int rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_partition_assignor(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern ConsumerGroupState rd_kafka_ConsumerGroupDescription_state(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_coordinator(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member_count(IntPtr grpdesc); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_ConsumerGroupDescription_member(IntPtr grpdesc, IntPtr idx); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_client_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_group_instance_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_consumer_id(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_host(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberDescription_assignment(IntPtr member); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_MemberAssignment_partitions(IntPtr assignment); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_id(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_host(IntPtr node); + + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] + internal static extern IntPtr rd_kafka_Node_port(IntPtr node); + [DllImport(DllName, CallingConvention = CallingConvention.Cdecl)] internal static extern ErrorCode rd_kafka_topic_result_error(IntPtr topicres); diff --git a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs index 3f9a56e0f..055fa68d3 100644 --- a/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs +++ b/src/Confluent.Kafka/Impl/SafeKafkaHandle.cs @@ -1173,6 +1173,27 @@ internal static List GetTopicPartitionOffsetErrorList .ToList(); } + /// + /// Creates and returns a List{TopicPartition} from a C rd_kafka_topic_partition_list_t *. + /// + internal static List GetTopicPartitionList(IntPtr listPtr) + { + if (listPtr == IntPtr.Zero) + { + throw new InvalidOperationException("FATAL: Cannot marshal from a NULL ptr."); + } + + var list = Util.Marshal.PtrToStructure(listPtr); + return Enumerable.Range(0, list.cnt) + .Select(i => Util.Marshal.PtrToStructure( + list.elems + i * Util.Marshal.SizeOf())) + .Select(ktp => new TopicPartition( + ktp.topic, + ktp.partition + )) + .ToList(); + } + /// /// Creates and returns a C rd_kafka_topic_partition_list_t * populated by offsets. /// @@ -1207,6 +1228,38 @@ internal static IntPtr GetCTopicPartitionList(IEnumerable return list; } + /// + /// Creates and returns a C rd_kafka_topic_partition_list_t *. + /// + /// + /// If offsets is null a null IntPtr will be returned, else a IntPtr + /// which must destroyed with LibRdKafka.topic_partition_list_destroy() + /// + internal static IntPtr GetCTopicPartitionList(IEnumerable partitions) + { + if (partitions == null) + { + return IntPtr.Zero; + } + + IntPtr list = Librdkafka.topic_partition_list_new((IntPtr)partitions.Count()); + if (list == IntPtr.Zero) + { + throw new Exception("Failed to create topic partition list"); + } + + foreach (var p in partitions) + { + if (p.Topic == null) + { + Librdkafka.topic_partition_list_destroy(list); + throw new ArgumentException("Cannot create offsets list because one or more topics is null."); + } + Librdkafka.topic_partition_list_add(list, p.Topic, p.Partition); + } + + return list; + } static byte[] CopyBytes(IntPtr ptr, IntPtr len) { @@ -1337,6 +1390,26 @@ private void setOption_OperationTimeout(IntPtr optionsPtr, TimeSpan? timeout) } } + private void setOption_RequireStableOffsets(IntPtr optionsPtr, bool requireStable) + { + var rError = Librdkafka.AdminOptions_set_require_stable_offsets(optionsPtr, (IntPtr)(int)(requireStable ? 1 : 0)); + var error = new Error(rError, true); + if (error.Code != ErrorCode.NoError) + { + throw new KafkaException(error); + } + + } + + private void setOption_MatchConsumerGroupStates(IntPtr optionsPtr, ConsumerGroupState[] states) + { + var error = Librdkafka.AdminOptions_set_match_consumer_group_states(optionsPtr, states, (UIntPtr)states.Count()); + if (error != IntPtr.Zero) + { + throw new KafkaException(new Error(error, true)); + } + } + private void setOption_completionSource(IntPtr optionsPtr, IntPtr completionSourcePtr) => Librdkafka.AdminOptions_set_opaque(optionsPtr, completionSourcePtr); @@ -1976,6 +2049,216 @@ private static void Validate(AclBindingFilter aclBindingFilter) } } + internal void AlterConsumerGroupOffsets( + IEnumerable groupsPartitions, + AlterConsumerGroupOffsetsOptions options, + IntPtr resultQueuePtr, + IntPtr completionSourcePtr) + { + ThrowIfHandleClosed(); + + // For now, we only support one group at a time given as a single element of groupsPartitions. + // Code has been written so that only this if-guard needs to be removed when we add support for + // multiple ConsumerGroupTopicPartitionOffsets. + if (groupsPartitions.Count() != 1) + { + throw new ArgumentException("Can only alter offsets for one group at a time"); + } + + IntPtr optionsPtr = IntPtr.Zero; + IntPtr[] groupsPartitionsPtrs = new IntPtr[groupsPartitions.Count()]; + + try + { + // Set admin options if any + options = options ?? new AlterConsumerGroupOffsetsOptions(); + optionsPtr = Librdkafka.AdminOptions_new(handle, Librdkafka.AdminOp.AlterConsumerGroupOffsets); + setOption_RequestTimeout(optionsPtr, options.RequestTimeout); + setOption_completionSource(optionsPtr, completionSourcePtr); + + // Create the objects required by librdkafka to call the method. + var idx = 0; + foreach (var groupPartitions in groupsPartitions) + { + if (groupPartitions == null) + { + throw new ArgumentException("Cannot alter consumer group offsets for null group"); + } + + // Create C list of topic partitions. + var list = SafeKafkaHandle.GetCTopicPartitionList(groupPartitions.TopicPartitionOffsets); + + groupsPartitionsPtrs[idx] = Librdkafka.AlterConsumerGroupOffsets_new(groupPartitions.Group, list); + idx++; + + if (list != IntPtr.Zero) + { + Librdkafka.topic_partition_list_destroy(list); + } + } + + + Librdkafka.AlterConsumerGroupOffsets(handle, groupsPartitionsPtrs, (UIntPtr)(uint)groupsPartitionsPtrs.Count(), optionsPtr, resultQueuePtr); + } + finally + { + // Clean up the options if created. + if (optionsPtr != IntPtr.Zero) + { + Librdkafka.AdminOptions_destroy(optionsPtr); + } + + // Clean up the groupPartitionPtr objects if created. + // Note that the function takes care of cleaning up the topic partition list inside the object. + foreach (var groupPartitionPtr in groupsPartitionsPtrs) + { + if (groupPartitionPtr != IntPtr.Zero) + { + Librdkafka.AlterConsumerGroupOffsets_destroy(groupPartitionPtr); + } + } + } + + } + + internal void ListConsumerGroupOffsets( + IEnumerable groupsPartitions, + ListConsumerGroupOffsetsOptions options, + IntPtr resultQueuePtr, + IntPtr completionSourcePtr) + { + ThrowIfHandleClosed(); + + // For now, we only support one group at a time given as a single element of groupsPartitions. + // Code has been written so that only this if-guard needs to be removed when we add support for + // multiple ConsumerGroupTopicPartitions. + if (groupsPartitions.Count() != 1) + { + throw new ArgumentException("Can only list offsets for one group at a time"); + } + + IntPtr optionsPtr = IntPtr.Zero; + IntPtr[] groupsPartitionPtrs = new IntPtr[groupsPartitions.Count()]; + + try + { + // Set admin options if any + options = options ?? new ListConsumerGroupOffsetsOptions(); + optionsPtr = Librdkafka.AdminOptions_new(handle, Librdkafka.AdminOp.ListConsumerGroupOffsets); + setOption_RequestTimeout(optionsPtr, options.RequestTimeout); + setOption_RequireStableOffsets(optionsPtr, options.RequireStableOffsets); + setOption_completionSource(optionsPtr, completionSourcePtr); + + // Create the objects required by librdkafka to call the method. + var idx = 0; + foreach (var groupPartitions in groupsPartitions) + { + if (groupPartitions == null) + { + throw new ArgumentException("Cannot list consumer group offsets for null group"); + } + + // Create C list of topic partitions. + var list = SafeKafkaHandle.GetCTopicPartitionList(groupPartitions.TopicPartitions); + + groupsPartitionPtrs[idx] = Librdkafka.ListConsumerGroupOffsets_new(groupPartitions.Group, list); + idx++; + + if (list != IntPtr.Zero) + { + Librdkafka.topic_partition_list_destroy(list); + } + } + + + Librdkafka.ListConsumerGroupOffsets(handle, groupsPartitionPtrs, (UIntPtr)(uint)groupsPartitionPtrs.Count(), optionsPtr, resultQueuePtr); + } + finally + { + // Clean up the options if created. + if (optionsPtr != IntPtr.Zero) + { + Librdkafka.AdminOptions_destroy(optionsPtr); + } + + // Clean up the groupsPartitionPtr objects if created. + // Note that the function takes care of cleaning up the topic partition list inside the object. + foreach (var groupsPartitionPtr in groupsPartitionPtrs) + { + if (groupsPartitionPtr != IntPtr.Zero) + { + Librdkafka.ListConsumerGroupOffsets_destroy(groupsPartitionPtr); + } + } + } + + } + + internal void ListConsumerGroups( + ListConsumerGroupsOptions options, + IntPtr resultQueuePtr, + IntPtr completionSourcePtr) + { + ThrowIfHandleClosed(); + + IntPtr optionsPtr = IntPtr.Zero; + try + { + // Set Admin Options if any. + options = options ?? new ListConsumerGroupsOptions(); + optionsPtr = Librdkafka.AdminOptions_new(handle, Librdkafka.AdminOp.ListConsumerGroups); + setOption_RequestTimeout(optionsPtr, options.RequestTimeout); + if (options.MatchStates != null) + { + setOption_MatchConsumerGroupStates(optionsPtr, options.MatchStates.ToArray()); + } + setOption_completionSource(optionsPtr, completionSourcePtr); + + // Call ListConsumerGroups (async). + Librdkafka.ListConsumerGroups(handle, optionsPtr, resultQueuePtr); + } + finally + { + if (optionsPtr != IntPtr.Zero) + { + Librdkafka.AdminOptions_destroy(optionsPtr); + } + + } + } + + + internal void DescribeConsumerGroups(IEnumerable groups, DescribeConsumerGroupsOptions options, IntPtr resultQueuePtr, IntPtr completionSourcePtr) + { + ThrowIfHandleClosed(); + + if (groups.Count() == 0) { + throw new ArgumentException("at least one group should be provided to DescribeConsumerGroups"); + } + + var optionsPtr = IntPtr.Zero; + try + { + // Set Admin Options if any. + options = options ?? new DescribeConsumerGroupsOptions(); + optionsPtr = Librdkafka.AdminOptions_new(handle, Librdkafka.AdminOp.DescribeConsumerGroups); + setOption_RequestTimeout(optionsPtr, options.RequestTimeout); + setOption_completionSource(optionsPtr, completionSourcePtr); + + // Call DescribeConsumerGroups (async). + Librdkafka.DescribeConsumerGroups( + handle, groups.ToArray(), (UIntPtr)(groups.Count()), + optionsPtr, resultQueuePtr); + } + finally + { + if (optionsPtr != IntPtr.Zero) + { + Librdkafka.AdminOptions_destroy(optionsPtr); + } + } + } + internal void OAuthBearerSetToken(string tokenValue, long lifetimeMs, string principalName, IDictionary extensions) { if (tokenValue == null) throw new ArgumentNullException(nameof(tokenValue)); diff --git a/src/Confluent.Kafka/Node.cs b/src/Confluent.Kafka/Node.cs new file mode 100644 index 000000000..6bd541175 --- /dev/null +++ b/src/Confluent.Kafka/Node.cs @@ -0,0 +1,47 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +namespace Confluent.Kafka +{ + /// + /// Node represents a Kafka broker. + /// + public class Node + { + /// + /// Id represents the Node Id. + /// + public int Id { get; set; } + + /// + /// Host represents the host of the broker. + /// + public string Host { get; set; } + + /// + /// Port represents the port of the broker. + /// + public int Port { get; set; } + + /// + /// Returns a human readable representation of this object. + /// + public override string ToString() + { + return $"Id = {Id}, {Host}:{Port}"; + } + } +} \ No newline at end of file diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_AlterListConsumerGroupOffsets.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_AlterListConsumerGroupOffsets.cs new file mode 100644 index 000000000..b77342977 --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_AlterListConsumerGroupOffsets.cs @@ -0,0 +1,183 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +#pragma warning disable xUnit1026 + +using System; +using System.Linq; +using System.Collections.Generic; +using Confluent.Kafka.Admin; +using Xunit; + + +namespace Confluent.Kafka.IntegrationTests +{ + public partial class Tests + { + /// + /// Test functionality of AdminClient.ListConsumerGroupOffsets and + /// AdminClient.AlterConsumerGroupOffsets. + /// + [Theory, MemberData(nameof(KafkaParameters))] + public void AdminClient_AlterListConsumerGroupOffsets(string bootstrapServers) + { + LogToFile("start AdminClient_AlterListConsumerGroupOffsets"); + var numMessages = 5; + var groupID = Guid.NewGuid().ToString(); + + using(var topic = new TemporaryTopic(bootstrapServers, 1)) + { + // This test needs us to first produce and consume from a topic before we can list the offsets. + // 1. Create topic and produce + var producerConfig = new ProducerConfig + { + BootstrapServers = bootstrapServers, + EnableIdempotence = true, + LingerMs = 1.5 + }; + + using (var producer = new ProducerBuilder(producerConfig).Build()) + { + for (int i = 0; i < numMessages; i++) + { + producer.Produce( + new TopicPartition(topic.Name, 0), + new Message { Key = "test key " + i, Value = "test val " + i }); + + } + producer.Flush(TimeSpan.FromSeconds(10)); + } + + + // Create an AdminClient here - to test alter while the consumer is still active. + var adminClient = new AdminClientBuilder(new AdminClientConfig { + BootstrapServers = bootstrapServers, + }).Build(); + + // 2. Consume + var consumerConfig = new ConsumerConfig + { + GroupId = groupID, + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false, + EnableAutoOffsetStore = false, + EnablePartitionEof = true, + }; + + using (var consumer = + new ConsumerBuilder(consumerConfig).Build()) + { + consumer.Subscribe(topic.Name); + + int msgCnt = 0; + while (true) + { + var record = consumer.Consume(TimeSpan.FromMilliseconds(100)); + if (record == null) { continue; } + if (record.IsPartitionEOF) { break; } + msgCnt += 1; + consumer.StoreOffset(record); + } + + Assert.Equal(numMessages, msgCnt); + consumer.Commit(); + + // Check that we are not be able to alter the offsets while the consumer is still active. + var errorOccured = false; + try + { + var tpoListInvalid = new List(); + tpoListInvalid.Add(new TopicPartitionOffset(topic.Name, 0, 2)); + var _ = adminClient.AlterConsumerGroupOffsetsAsync( + new ConsumerGroupTopicPartitionOffsets[] { + new ConsumerGroupTopicPartitionOffsets(groupID, tpoListInvalid), + }).Result; + } + catch (Exception e) + { + errorOccured = true; + Assert.IsType(e.InnerException); + } + Assert.True(errorOccured); + + consumer.Close(); + } + + // 3. List, Alter and then again List Consumer Group Offsets + var tpList = new List(); + tpList.Add(new TopicPartition(topic.Name, 0)); + var lcgoResults = adminClient.ListConsumerGroupOffsetsAsync( + new ConsumerGroupTopicPartitions[] { + new ConsumerGroupTopicPartitions(groupID, tpList), + }, + new ListConsumerGroupOffsetsOptions() { RequireStableOffsets = false } + ).Result; + + Assert.Single(lcgoResults); + + var groupResultListing = lcgoResults[0]; + Assert.NotNull(groupResultListing); + Assert.Single(groupResultListing.Partitions); + Assert.Equal(topic.Name, groupResultListing.Partitions[0].Topic); + Assert.Equal(0, groupResultListing.Partitions[0].Partition.Value); + Assert.Equal(5, groupResultListing.Partitions[0].Offset); + Assert.False(groupResultListing.Partitions[0].Error.IsError); + + var tpoList = new List(); + tpoList.Add(new TopicPartitionOffset(topic.Name, 0, 2)); + var acgoResults = adminClient.AlterConsumerGroupOffsetsAsync( + new ConsumerGroupTopicPartitionOffsets[] { + new ConsumerGroupTopicPartitionOffsets(groupID, tpoList), + }).Result; + + Assert.Single(acgoResults); + var groupResultAlter = acgoResults[0]; + Assert.NotNull(groupResultAlter); + Assert.Single(groupResultAlter.Partitions); + Assert.Equal(topic.Name, groupResultAlter.Partitions[0].Topic); + Assert.Equal(0, groupResultAlter.Partitions[0].Partition.Value); + Assert.Equal(2, groupResultAlter.Partitions[0].Offset); + Assert.False(groupResultAlter.Partitions[0].Error.IsError); + + tpList = new List(); + tpList.Add(new TopicPartition(topic.Name, 0)); + lcgoResults = adminClient.ListConsumerGroupOffsetsAsync( + new ConsumerGroupTopicPartitions[] { + new ConsumerGroupTopicPartitions(groupID, tpList), + }, + new ListConsumerGroupOffsetsOptions() { RequireStableOffsets = false } + ).Result; + + Assert.Single(lcgoResults); + + groupResultListing = lcgoResults[0]; + Assert.NotNull(groupResultListing); + Assert.Single(groupResultListing.Partitions); + Assert.Equal(topic.Name, groupResultListing.Partitions[0].Topic); + Assert.Equal(0, groupResultListing.Partitions[0].Partition.Value); + Assert.Equal(2, groupResultListing.Partitions[0].Offset); + Assert.False(groupResultListing.Partitions[0].Error.IsError); + + adminClient.Dispose(); + } + + Assert.Equal(0, Library.HandleCount); + LogToFile("end AdminClient_AlterListConsumerGroupOffsets"); + } + } +} diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs new file mode 100644 index 000000000..8a7277be5 --- /dev/null +++ b/test/Confluent.Kafka.IntegrationTests/Tests/AdminClient_ListDescribeConsumerGroups.cs @@ -0,0 +1,175 @@ +// Copyright 2022 Confluent Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Refer to LICENSE for more information. + +#pragma warning disable xUnit1026 + +using System; +using System.Linq; +using System.Collections.Generic; +using Xunit; +using Confluent.Kafka.Admin; + +namespace Confluent.Kafka.IntegrationTests +{ + public partial class Tests + { + // A convenience method to check the resultant ConsumerGroupDescription obtained on describing a group. + private void checkConsumerGroupDescription( + ConsumerGroupDescription desc, ConsumerGroupState state, + string protocol, string groupID, + Dictionary> clientIdToToppars) + { + Assert.Equal(groupID, desc.GroupId); + Assert.Equal(ErrorCode.NoError, desc.Error.Code); + Assert.Equal(state, desc.State); + Assert.Equal(protocol, desc.PartitionAssignor); + // We can't check exactly the Broker information, but we add a + // check for the zero-value of the Host. + Assert.NotEqual("", desc.Coordinator.Host); + Assert.Equal(clientIdToToppars.Count(), desc.Members.Count()); + // We will run all our tests on non-simple consumer groups only. + Assert.False(desc.IsSimpleConsumerGroup); + + foreach (var member in desc.Members) + { + Assert.True(clientIdToToppars.ContainsKey(member.ClientId)); + Assert.True(clientIdToToppars[member.ClientId].SequenceEqual(member.Assignment.TopicPartitions)); + } + } + + /// + /// Test functionality of AdminClient.ListConsumerGroups and + /// AdminClient.DescribeConsumerGroups. We test three cases: + /// 1. One consumer group with one client. + /// 2. One consumer group with two clients. + /// 3. Empty consumer group. + /// + [Theory, MemberData(nameof(KafkaParameters))] + public void AdminClient_ListDescribeConsumerGroups(string bootstrapServers) + { + LogToFile("start AdminClient_ListDescribeConsumerGroups"); + var groupID = Guid.NewGuid().ToString(); + var nonExistentGroupID = Guid.NewGuid().ToString(); + const string clientID1 = "test.client.1"; + const string clientID2 = "test.client.2"; + + // Create an AdminClient here - we need it throughout the test. + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { + BootstrapServers = bootstrapServers }).Build()) + { + var listOptionsWithTimeout = new Admin.ListConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30) }; + var describeOptionsWithTimeout = new Admin.DescribeConsumerGroupsOptions() { RequestTimeout = TimeSpan.FromSeconds(30) }; + + // We should not have any group initially. + var groups = adminClient.ListConsumerGroupsAsync().Result; + Assert.Empty(groups.Valid.Where(group => group.GroupId == groupID)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupID)); + + // Ensure that the partitioned topic we are using has exactly two partitions. + Assert.Equal(2, partitionedTopicNumPartitions); + + // 1. One consumer group with one client. + var consumerConfig = new ConsumerConfig + { + GroupId = groupID, + BootstrapServers = bootstrapServers, + SessionTimeoutMs = 6000, + PartitionAssignmentStrategy = PartitionAssignmentStrategy.Range, + ClientId = clientID1, + + }; + var consumer1 = new ConsumerBuilder(consumerConfig).Build(); + consumer1.Subscribe(new string[] { partitionedTopic }); + // Wait for rebalance. + consumer1.Consume(TimeSpan.FromSeconds(10)); + + groups = adminClient.ListConsumerGroupsAsync(listOptionsWithTimeout).Result; + Assert.Single(groups.Valid.Where(group => group.GroupId == groupID)); + Assert.Empty(groups.Valid.Where(group => group.GroupId == nonExistentGroupID)); + var group = groups.Valid.Find(group => group.GroupId == groupID); + Assert.Equal(ConsumerGroupState.Stable, group.State); + Assert.False(group.IsSimpleConsumerGroup); + + var descResult = adminClient.DescribeConsumerGroupsAsync( + new List() { groupID }, + describeOptionsWithTimeout).Result; + var groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID); + var clientIdToToppars = new Dictionary>(); + clientIdToToppars[clientID1] = new List() { + new TopicPartition(partitionedTopic, 0), + new TopicPartition(partitionedTopic, 1), + }; + checkConsumerGroupDescription( + groupDesc, ConsumerGroupState.Stable, "range", groupID, clientIdToToppars); + + // 2. One consumer group with two clients. + consumerConfig.ClientId = clientID2; + var consumer2 = new ConsumerBuilder(consumerConfig).Build(); + consumer2.Subscribe(new string[] { partitionedTopic }); + + // Wait for rebalance. + var state = ConsumerGroupState.PreparingRebalance; + while (state != ConsumerGroupState.Stable) + { + consumer1.Consume(TimeSpan.FromSeconds(1)); + consumer2.Consume(TimeSpan.FromSeconds(1)); + + descResult = adminClient.DescribeConsumerGroupsAsync( + new List() { groupID }, + describeOptionsWithTimeout).Result; + Assert.Single(descResult.ConsumerGroupDescriptions.Where(group => group.GroupId == groupID)); + groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID); + state = groupDesc.State; + } + + clientIdToToppars[clientID1] = new List() { + new TopicPartition(partitionedTopic, 0) + }; + clientIdToToppars[clientID2] = new List() { + new TopicPartition(partitionedTopic, 1) + }; + checkConsumerGroupDescription( + groupDesc, ConsumerGroupState.Stable, "range", groupID, clientIdToToppars); + + // 3. Empty consumer group. + consumer1.Close(); + consumer2.Close(); + consumer1.Dispose(); + consumer2.Dispose(); + + + // Check the 'States' option by listing Stable consumer groups, which shouldn't + // include `groupID`. + groups = adminClient.ListConsumerGroupsAsync(new Admin.ListConsumerGroupsOptions() + { MatchStates = new List() { ConsumerGroupState.Stable }, + RequestTimeout = TimeSpan.FromSeconds(30) }).Result; + Assert.Empty(groups.Valid.Where(group => group.GroupId == groupID)); + + descResult = adminClient.DescribeConsumerGroupsAsync( + new List() { groupID }, + describeOptionsWithTimeout).Result; + Assert.Single(descResult.ConsumerGroupDescriptions.Where(group => group.GroupId == groupID)); + groupDesc = descResult.ConsumerGroupDescriptions.Find(group => group.GroupId == groupID); + clientIdToToppars = new Dictionary>(); + checkConsumerGroupDescription( + groupDesc, ConsumerGroupState.Empty, "", groupID, clientIdToToppars); + } + + Assert.Equal(0, Library.HandleCount); + LogToFile("end AdminClient_ListDescribeConsumerGroups"); + } + } +} diff --git a/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs b/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs index d87552d71..67724028b 100644 --- a/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs +++ b/test/Confluent.Kafka.IntegrationTests/Tests/Tests.cs @@ -29,6 +29,8 @@ public class GlobalFixture : IDisposable { private string bootstrapServers; + public const int partitionedTopicNumPartitions = 2; + public GlobalFixture() { var assemblyPath = typeof(Tests).GetTypeInfo().Assembly.Location; @@ -45,7 +47,7 @@ public GlobalFixture() { adminClient.CreateTopicsAsync(new List { new TopicSpecification { Name = SinglePartitionTopic, NumPartitions = 1, ReplicationFactor = 1 }, - new TopicSpecification { Name = PartitionedTopic, NumPartitions = 2, ReplicationFactor = 1 } + new TopicSpecification { Name = PartitionedTopic, NumPartitions = partitionedTopicNumPartitions, ReplicationFactor = 1 } }).Wait(); } } @@ -78,6 +80,8 @@ public partial class Tests private string singlePartitionTopic; private string partitionedTopic; + public const int partitionedTopicNumPartitions = GlobalFixture.partitionedTopicNumPartitions; + private static List kafkaParameters; private static List oAuthBearerKafkaParameters;