-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathChannels.linq
142 lines (103 loc) · 3.75 KB
/
Channels.linq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
<Query Kind="Program">
<Namespace>System.Threading.Channels</Namespace>
<Namespace>System.Threading.Tasks</Namespace>
</Query>
async Task Main()
{
//Scenario 1: Single producer, single consumer
await SingleProducerSingleConsumer();
//Scenario 2: Multiple producers, single consumer
await MultiProducerSingleConsumer();
//Scenario 3: Single producer, multiple consumers
await SingleProduceMultipleConsumers();
}
public async Task SingleProducerSingleConsumer()
{
var channel = Channel.CreateUnbounded<string>();
// In this example, the consumer keeps up with the producer
var producer1 = new Producer(channel.Writer, 1, 2000);
var consumer1 = new Consumer(channel.Reader, 1, 1500);
Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
Task producerTask1 = producer1.BeginProducing(); // begin producing
await producerTask1.ContinueWith(_ => channel.Writer.Complete());
await consumerTask1;
}
public async Task MultiProducerSingleConsumer()
{
var channel = Channel.CreateUnbounded<string>();
// In this example, a single consumer easily keeps up with two producers
var producer1 = new Producer(channel.Writer, 1, 2000);
var producer2 = new Producer(channel.Writer, 2, 2000);
var consumer1 = new Consumer(channel.Reader, 1, 250);
Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
Task producerTask1 = producer1.BeginProducing();
await Task.Delay(500); // stagger the producers
Task producerTask2 = producer2.BeginProducing();
await Task.WhenAll(producerTask1, producerTask2)
.ContinueWith(_ => channel.Writer.Complete());
await consumerTask1;
}
public async Task SingleProduceMultipleConsumers()
{
var channel = Channel.CreateUnbounded<string>();
// In this example, multiple consumers are needed to keep up with a fast producer
var producer1 = new Producer(channel.Writer, 1, 100);
var consumer1 = new Consumer(channel.Reader, 1, 1500);
var consumer2 = new Consumer(channel.Reader, 2, 1500);
var consumer3 = new Consumer(channel.Reader, 3, 1500);
Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
Task consumerTask3 = consumer3.ConsumeData(); // begin consuming
Task producerTask1 = producer1.BeginProducing();
await producerTask1.ContinueWith(_ => channel.Writer.Complete());
await Task.WhenAll(consumerTask1, consumerTask2, consumerTask3);
}
internal class Producer
{
private readonly ChannelWriter<string> _writer;
private readonly int _identifier;
private readonly int _delay;
public Producer(ChannelWriter<string> writer, int identifier, int delay)
{
_writer = writer;
_identifier = identifier;
_delay = delay;
}
public async Task BeginProducing()
{
($"PRODUCER ({_identifier}): Starting").Dump("Producer");
for (var i = 0; i < 10; i++)
{
await Task.Delay(_delay); // simulate producer building/fetching some data
var msg = $"P{_identifier} - {DateTime.UtcNow:G}";
($"PRODUCER ({_identifier}): Creating {msg}").Dump("Producer");
await _writer.WriteAsync(msg);
}
($"PRODUCER ({_identifier}): Completed").Dump("Producer");
}
}
internal class Consumer
{
private readonly ChannelReader<string> _reader;
private readonly int _identifier;
private readonly int _delay;
public Consumer(ChannelReader<string> reader, int identifier, int delay)
{
_reader = reader;
_identifier = identifier;
_delay = delay;
}
public async Task ConsumeData()
{
($"CONSUMER ({_identifier}): Starting").Dump("Consumer");
while (await _reader.WaitToReadAsync())
{
if (_reader.TryRead(out var timeString))
{
await Task.Delay(_delay); // simulate processing time
($"CONSUMER ({_identifier}): Consuming {timeString}").Dump("Consumer");;
}
}
($"CONSUMER ({_identifier}): Completed").Dump("Consumer");;
}
}