forked from ch-robinson/dotnet-avro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Program.cs
143 lines (119 loc) · 4.93 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
namespace Chr.Avro.DefaultValuesExample
{
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Chr.Avro.Abstract;
using Chr.Avro.Confluent;
using Chr.Avro.DefaultValuesExample.Models;
using Chr.Avro.Representation;
using global::Confluent.Kafka;
using global::Confluent.Kafka.Admin;
using global::Confluent.SchemaRegistry;
using Schema = global::Confluent.SchemaRegistry.Schema;
internal class Program
{
private const string BootstrapServers = "localhost:9092";
private const string SchemaRegistries = "http://localhost:8081";
private const string Topic = "default-values-example";
private static readonly TimeSpan AssignmentTimeout = TimeSpan.FromSeconds(15);
public static async Task<int> Main()
{
using var registryClient = new CachedSchemaRegistryClient(
new SchemaRegistryConfig
{
Url = SchemaRegistries,
});
using var admin = CreateAdmin();
using var consumer = CreateConsumer(registryClient);
using var producer = await CreateProducer(registryClient);
Console.WriteLine($"Creating {Topic}...");
await EnsureTopicExists(admin);
consumer.Subscribe(Topic);
var assignmentSignal = new CancellationTokenSource(AssignmentTimeout);
Console.WriteLine($"Subscribing to {Topic}...");
while (consumer.Assignment.Count < 1)
{
if (assignmentSignal.IsCancellationRequested)
{
Console.Error.WriteLine($"Failed to receive partition assigment for {Topic} within {AssignmentTimeout.TotalSeconds} seconds.");
return 1;
}
await Task.Delay(TimeSpan.FromSeconds(1));
}
var playerV2 = new PlayerV2
{
Id = Guid.NewGuid(),
Nickname = "Todd Bonzalez",
};
await producer.ProduceAsync(Topic, new Message<Guid, PlayerV2>
{
Key = playerV2.Id,
Value = playerV2,
});
var result = consumer.Consume();
var playerV1 = result.Message.Value;
Console.WriteLine($"Received update for {playerV1.Nickname} with health {playerV1.Health}.");
return 0;
}
private static IAdminClient CreateAdmin()
{
return new AdminClientBuilder(
new AdminClientConfig
{
BootstrapServers = BootstrapServers,
})
.Build();
}
private static IConsumer<Guid, PlayerV1> CreateConsumer(
ISchemaRegistryClient registryClient)
{
return new ConsumerBuilder<Guid, PlayerV1>(
new ConsumerConfig
{
BootstrapServers = BootstrapServers,
EnableAutoCommit = false,
GroupId = $"union-type-example-{Guid.NewGuid()}",
})
.SetAvroKeyDeserializer(registryClient)
.SetAvroValueDeserializer(registryClient)
.Build();
}
private static async Task<IProducer<Guid, PlayerV2>> CreateProducer(
ISchemaRegistryClient registryClient)
{
var schemaBuilder = new SchemaBuilder();
var schemaWriter = new JsonSchemaWriter();
var keySchemaId = await registryClient.RegisterSchemaAsync(
SubjectNameStrategy.Topic.ConstructKeySubjectName(Topic),
new Schema(schemaWriter.Write(schemaBuilder.BuildSchema<Guid>()), SchemaType.Avro));
var valueSchemaId = await registryClient.RegisterSchemaAsync(
SubjectNameStrategy.Topic.ConstructValueSubjectName(Topic),
new Schema(schemaWriter.Write(schemaBuilder.BuildSchema<PlayerV1>()), SchemaType.Avro));
var producerBuilder = new ProducerBuilder<Guid, PlayerV2>(
new ProducerConfig
{
BootstrapServers = BootstrapServers,
});
await producerBuilder.SetAvroKeySerializer(registryClient, keySchemaId);
await producerBuilder.SetAvroValueSerializer(registryClient, valueSchemaId);
return producerBuilder.Build();
}
private static async Task EnsureTopicExists(IAdminClient admin)
{
var metadata = admin.GetMetadata(Topic, TimeSpan.FromSeconds(15));
if (!metadata.Topics.Any(m => m.Topic == Topic))
{
await admin.CreateTopicsAsync(
new[]
{
new TopicSpecification
{
Name = Topic,
},
});
}
}
}
}