-
Notifications
You must be signed in to change notification settings - Fork 838
/
Program.cs
129 lines (117 loc) · 5.32 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.
using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Confluent.Kafka.Examples.AvroGeneric
{
class Program
{
static async Task Main(string[] args)
{
if (args.Length != 3)
{
Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
return;
}
string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topicName = args[2];
string groupName = "avro-generic-example-group";
// var s = (RecordSchema)RecordSchema.Parse(File.ReadAllText("my-schema.json"));
var s = (RecordSchema)RecordSchema.Parse(
@"{
""type"": ""record"",
""name"": ""User"",
""fields"": [
{""name"": ""name"", ""type"": ""string""},
{""name"": ""favorite_number"", ""type"": ""long""},
{""name"": ""favorite_color"", ""type"": ""string""}
]
}"
);
CancellationTokenSource cts = new CancellationTokenSource();
var consumeTask = Task.Run(() =>
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
using (var consumer =
new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Message.Value}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// commit final offsets and leave the group.
consumer.Close();
}
}
});
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
using (var producer =
new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
.Build())
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
long i = 1;
string text;
while ((text = Console.ReadLine()) != "q")
{
var record = new GenericRecord(s);
record.Add("name", text);
record.Add("favorite_number", i++);
record.Add("favorite_color", "blue");
try
{
var dr = await producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record });
Console.WriteLine($"produced to: {dr.TopicPartitionOffset}");
}
catch (ProduceException<string, GenericRecord> ex)
{
// In some cases (notably Schema Registry connectivity issues), the InnerException
// of the ProduceException contains additional informatiom pertaining to the root
// cause of the problem. This information is automatically included in the output
// of the ToString() method of the ProduceException, called implicitly in the below.
Console.WriteLine($"error producing message: {ex}");
}
}
}
cts.Cancel();
}
}
}