Skip to content

Commit

Permalink
[admin] Add consumer group bindings for KIP-88, 222, 518, 396 (parti…
Browse files Browse the repository at this point in the history
…al) (#1981)

* [admin] Add ListConsumerGroupOffsets and AlterConsumerGroupOffsets bindings
* [admin] Add ListConsumerGroups and DescribeConsumerGroups bindings
* Also deprecates the older ListGroups code
* [admin] Add KIP-518 changes to List/Describe Consumer Groups


Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
  • Loading branch information
milindl and emasab committed Jan 23, 2023
1 parent e402072 commit 315c3f5
Show file tree
Hide file tree
Showing 42 changed files with 3,241 additions and 44 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@
- Added `LatestCompatibilityStrict` configuration property to JsonSerializerConfig to check the compatibility with latest schema
when `UseLatestVersion` is set to true.
- Added DeleteConsumerGroupOffset to AdminClient.
- [KIP-222](https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API)
Finish remaining implementation: Add Consumer Group operations to Admin API (`DeleteGroups` is already present).
- [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state)
Allow listing consumer groups per state.
- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484)
Partially implemented: support for AlterConsumerGroupOffsets.
- As result of the above KIPs, added (#1981)
- `ListConsumerGroups` Admin operation. Supports listing by state.
- `DescribeConsumerGroups` Admin operation. Supports multiple groups.
- `ListConsumerGroupOffsets` Admin operation. Currently, only supports
1 group with multiple partitions. Supports the `requireStable` option.
- `AlterConsumerGroupOffsets` Admin operation. Currently, only supports
1 group with multiple offsets.

## Fixes

Expand Down
228 changes: 227 additions & 1 deletion examples/AdminClient/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,225 @@ static async Task DeleteAclsAsync(string bootstrapServers, string[] commandArgs)
}
}

static async Task AlterConsumerGroupOffsetsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 4)
{
Console.WriteLine("usage: .. <bootstrapServers> alter-consumer-group-offsets <group_id> <topic1> <partition1> <offset1> ... <topicN> <partitionN> <offsetN>");
Environment.ExitCode = 1;
return;
}

var group = commandArgs[0];
var tpoes = new List<TopicPartitionOffset>();
for (int i = 1; i + 2 < commandArgs.Length; i += 3) {
try
{
var topic = commandArgs[i];
var partition = Int32.Parse(commandArgs[i + 1]);
var offset = Int64.Parse(commandArgs[i + 2]);
tpoes.Add(new TopicPartitionOffset(topic, partition, offset));
}
catch (Exception e)
{
Console.Error.WriteLine($"An error occurred while parsing arguments: {e}");
Environment.ExitCode = 1;
return;
}
}

var input = new List<ConsumerGroupTopicPartitionOffsets>() { new ConsumerGroupTopicPartitionOffsets(group, tpoes) };

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
var results = await adminClient.AlterConsumerGroupOffsetsAsync(input);
Console.WriteLine("Successfully altered offsets:");
foreach(var groupResult in results) {
Console.WriteLine(groupResult);
}

}
catch (AlterConsumerGroupOffsetsException e)
{
Console.WriteLine($"An error occurred altering offsets: {(e.Results.Any() ? e.Results[0] : null)}");
Environment.ExitCode = 1;
return;
}
catch (KafkaException e)
{
Console.WriteLine("An error occurred altering consumer group offsets." +
$" Code: {e.Error.Code}" +
$", Reason: {e.Error.Reason}");
Environment.ExitCode = 1;
return;
}
}
}

static async Task ListConsumerGroupOffsetsAsync(string bootstrapServers, string[] commandArgs)
{
if (commandArgs.Length < 1)
{
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-group-offsets <group_id> [<topic1> <partition1> ... <topicN> <partitionN>]");
Environment.ExitCode = 1;
return;
}

var group = commandArgs[0];
var tpes = new List<TopicPartition>();
for (int i = 1; i + 1 < commandArgs.Length; i += 2) {
try
{
var topic = commandArgs[i];
var partition = Int32.Parse(commandArgs[i + 1]);
tpes.Add(new TopicPartition(topic, partition));
}
catch (Exception e)
{
Console.Error.WriteLine($"An error occurred while parsing arguments: {e}");
Environment.ExitCode = 1;
return;
}
}
if(!tpes.Any())
{
// In case the list is empty, request offsets for all the partitions.
tpes = null;
}

var input = new List<ConsumerGroupTopicPartitions>() { new ConsumerGroupTopicPartitions(group, tpes) };

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
var result = await adminClient.ListConsumerGroupOffsetsAsync(input);
Console.WriteLine("Successfully listed offsets:");
foreach(var groupResult in result) {
Console.WriteLine(groupResult);
}
}
catch (ListConsumerGroupOffsetsException e)
{
Console.WriteLine($"An error occurred listing offsets: {(e.Results.Any() ? e.Results[0] : null)}");
Environment.ExitCode = 1;
return;
}
catch (KafkaException e)
{
Console.WriteLine("An error occurred listing consumer group offsets." +
$" Code: {e.Error.Code}" +
$", Reason: {e.Error.Reason}");
Environment.ExitCode = 1;
return;
}
}
}

static async Task ListConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) {
var timeout = TimeSpan.FromSeconds(30);
var statesList = new List<ConsumerGroupState>();
try
{
if (commandArgs.Length > 0)
{
timeout = TimeSpan.FromSeconds(Int32.Parse(commandArgs[0]));
}
if (commandArgs.Length > 1)
{
for (int i = 1; i < commandArgs.Length; i++) {
statesList.Add(Enum.Parse<ConsumerGroupState>(commandArgs[i]));
}
}
}
catch (SystemException)
{
Console.WriteLine("usage: .. <bootstrapServers> list-consumer-groups [<timeout_seconds> <match_state_1> <match_state_2> ... <match_state_N>]");
Environment.ExitCode = 1;
return;
}

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
var result = await adminClient.ListConsumerGroupsAsync(new ListConsumerGroupsOptions() {
RequestTimeout = timeout,
MatchStates = statesList,
});
Console.WriteLine(result);
}
catch (KafkaException e)
{
Console.WriteLine("An error occurred listing consumer groups." +
$" Code: {e.Error.Code}" +
$", Reason: {e.Error.Reason}");
Environment.ExitCode = 1;
return;
}

}
}


static async Task DescribeConsumerGroupsAsync(string bootstrapServers, string[] commandArgs) {
if (commandArgs.Length < 1)
{
Console.WriteLine("usage: .. <bootstrapServers> describe-consumer-groups <group1> [<group2 ... <groupN>]");
Environment.ExitCode = 1;
return;
}

var groupNames = commandArgs.ToList();
var timeout = TimeSpan.FromSeconds(30);
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
{
try
{
var descResult = await adminClient.DescribeConsumerGroupsAsync(groupNames, new DescribeConsumerGroupsOptions() { RequestTimeout = timeout });
foreach (var group in descResult.ConsumerGroupDescriptions)
{
Console.WriteLine($" Group: {group.GroupId} {group.Error}");
Console.WriteLine($" Broker: {group.Coordinator}");
Console.WriteLine($" IsSimpleConsumerGroup: {group.IsSimpleConsumerGroup}");
Console.WriteLine($" PartitionAssignor: {group.PartitionAssignor}");
Console.WriteLine($" State: {group.State}");
Console.WriteLine($" Members:");
foreach (var m in group.Members)
{
Console.WriteLine($" {m.ClientId} {m.ConsumerId} {m.Host}");
Console.WriteLine($" Assignment:");
var topicPartitions = "";
if (m.Assignment.TopicPartitions != null)
{
topicPartitions = String.Join(", ", m.Assignment.TopicPartitions.Select(tp => tp.ToString()));
}
Console.WriteLine($" TopicPartitions: [{topicPartitions}]");
}
}
}
catch (KafkaException e)
{
Console.WriteLine($"An error occurred describing consumer groups: {e}");
Environment.ExitCode = 1;
}
}
}

public static async Task Main(string[] args)
{
if (args.Length < 2)
{
Console.WriteLine("usage: .. <bootstrapServers> <list-groups|metadata|library-version|create-topic|create-acls|describe-acls|delete-acls> ..");
Console.WriteLine(
"usage: .. <bootstrapServers> " + String.Join("|", new string[] {
"list-groups", "metadata", "library-version", "create-topic", "create-acls",
"describe-acls", "delete-acls",
"list-consumer-groups", "describe-consumer-groups",
"list-consumer-group-offsets", "alter-consumer-group-offsets"
}) +
" ..");
Environment.ExitCode = 1;
return;
}
Expand Down Expand Up @@ -331,6 +545,18 @@ public static async Task Main(string[] args)
case "delete-acls":
await DeleteAclsAsync(bootstrapServers, commandArgs);
break;
case "alter-consumer-group-offsets":
await AlterConsumerGroupOffsetsAsync(bootstrapServers, commandArgs);
break;
case "list-consumer-group-offsets":
await ListConsumerGroupOffsetsAsync(bootstrapServers, commandArgs);
break;
case "list-consumer-groups":
await ListConsumerGroupsAsync(bootstrapServers, commandArgs);
break;
case "describe-consumer-groups":
await DescribeConsumerGroupsAsync(bootstrapServers, commandArgs);
break;
default:
Console.WriteLine($"unknown command: {command}");
break;
Expand Down
48 changes: 48 additions & 0 deletions src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Collections.Generic;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Represents an error that occured during altering consumer group offsets.
/// </summary>
public class AlterConsumerGroupOffsetsException : KafkaException
{
/// <summary>
/// Initializes a new instance of AlterConsumerGroupOffsetsException.
/// </summary>
/// <param name="results">
/// The result corresponding to all groups/partitions in the request
/// (whether or not they were in error). At least one of these
/// results will be in error.
/// </param>
public AlterConsumerGroupOffsetsException(List<AlterConsumerGroupOffsetsReport> results)
: base(new Error(ErrorCode.Local_Partial,
"An error occurred altering consumer group offsets, check individual result elements"))
{
Results = results;
}

/// <summary>
/// The result corresponding to all groups/partitions in the request
/// (whether or not they were in error). At least one of these
/// results will be in error.
/// </summary>
public List<AlterConsumerGroupOffsetsReport> Results { get; }
}
}
36 changes: 36 additions & 0 deletions src/Confluent.Kafka/Admin/AlterConsumerGroupOffsetsOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2022 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System;

namespace Confluent.Kafka.Admin
{
/// <summary>
/// Options for the "AdminClient.AlterConsumerGroupOffsetsAsync" method.
/// </summary>
public class AlterConsumerGroupOffsetsOptions
{
/// <summary>
/// The overall request timeout, including broker lookup, request
/// transmission, operation time on broker, and response. If set
/// to null, the default request timeout for the AdminClient will
/// be used.
///
/// Default: null
/// </summary>
public TimeSpan? RequestTimeout { get; set; }
}
}

0 comments on commit 315c3f5

Please sign in to comment.