-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
channelpool.js
138 lines (114 loc) · 3.44 KB
/
channelpool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import { getChannels, toPascalCase } from '../../utils/common';
const template = (asyncapi, params) => {
const publishers = getChannels(asyncapi).filter(
(channel) => channel.isPublish
);
const consumers = getChannels(asyncapi).filter(
(channel) => !channel.isPublish
);
return `using System;
using System.Collections.Generic;
using ${params.namespace}.Services.Interfaces;
using RabbitMQ.Client;
namespace ${params.namespace}.Services;
/// <summary>
/// A channel pool for all channels defined in the async api specification
/// </summary>
public class ChannelPool : IChannelPool
{
private class Channel : IDisposable
{
/// <summary>
/// The confirm mode for the channel
/// </summary>
public bool Confirm { get; init; }
/// <summary>
/// The prefetch count for the channel
/// </summary>
public ushort PrefetchCount { get; init; }
/// <summary>
/// The underlying amqp model/channel
/// </summary>
public IModel Model { get; init; }
public void Dispose()
{
Model?.Close();
Model?.Dispose();
}
}
private readonly IConnection _connection;
private readonly IDictionary<string, Channel> _channels = new Dictionary<string, Channel>();
private ChannelPool(IConnection connection)
{
_connection = connection;
// creating producer channels
${publishers.map(
(publisher) => `_channels.Add(
"${toPascalCase(publisher.operationId)}",
CreateChannel(connection));`
)}
// creating consumer channels
${consumers.map(
(consumer) => `_channels.Add(
"${toPascalCase(consumer.operationId)}",
CreateChannel(
connection,
${consumer.prefetchCount},
${consumer.confirm}));`
)}
}
public static IChannelPool Create(IConnection connection)
{
return new ChannelPool(connection);
}
public IModel GetChannel(string operationId)
{
// check for channel
if (!_channels.TryGetValue(operationId, out var channel))
{
throw new KeyNotFoundException($"No channel found for {operationId}");
}
if (!channel.Model.IsClosed)
{
return channel.Model;
}
// recreate channel if it is closed
_channels[operationId] = CreateChannel(
_connection,
channel.PrefetchCount, // prefetch from x-prefetch-count on channel binding
channel.Confirm); // confirm from confirm on operation binding
return _channels[operationId].Model;
}
private Channel CreateChannel(
IConnection connection,
ushort prefetchCount = 100,
bool confirm = false)
{
var model = connection.CreateModel();
if (confirm)
{
model.ConfirmSelect();
}
model.BasicQos(0, prefetchCount, false);
return new Channel
{
PrefetchCount = prefetchCount,
Confirm = confirm,
Model = model
};
}
public void Dispose()
{
foreach (var (_, channel) in _channels)
{
channel?.Dispose();
}
}
}`;
};
export function ChannelPool({ asyncapi, params }) {
if (!asyncapi.hasComponents()) {
return null;
}
return template(asyncapi, params);
}