-
Notifications
You must be signed in to change notification settings - Fork 751
/
Program.cs
executable file
·166 lines (144 loc) · 5.89 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
using System;
using System.Collections.Generic;
using System.IO;
using System.Reflection;
using System.Text;
namespace EasyNetQ.Hosepipe
{
public class Program
{
private readonly ArgParser argParser;
private readonly IQueueRetreival queueRetreival;
private readonly IMessageWriter messageWriter;
private readonly IMessageReader messageReader;
private readonly IQueueInsertion queueInsertion;
private readonly IErrorRetry errorRetry;
public Program(
ArgParser argParser,
IQueueRetreival queueRetreival,
IMessageWriter messageWriter,
IMessageReader messageReader,
IQueueInsertion queueInsertion,
IErrorRetry errorRetry)
{
this.argParser = argParser;
this.queueRetreival = queueRetreival;
this.messageWriter = messageWriter;
this.messageReader = messageReader;
this.queueInsertion = queueInsertion;
this.errorRetry = errorRetry;
}
public static void Main(string[] args)
{
// poor man's dependency injection FTW ;)
var program = new Program(
new ArgParser(),
new QueueRetreival(),
new FileMessageWriter(),
new MessageReader(),
new QueueInsertion(),
new ErrorRetry(new JsonSerializer()));
program.Start(args);
}
public void Start(string[] args)
{
var arguments = argParser.Parse(args);
var results = new StringBuilder();
var succeeded = true;
Func<string, Action> messsage = m => () =>
{
results.AppendLine(m);
succeeded = false;
};
var parameters = new QueueParameters();
arguments.WithKey("s", a => parameters.HostName = a.Value);
arguments.WithKey("v", a => parameters.VHost = a.Value);
arguments.WithKey("u", a => parameters.Username = a.Value);
arguments.WithKey("p", a => parameters.Password = a.Value);
arguments.WithKey("o", a => parameters.MessageFilePath = a.Value);
try
{
arguments.At(0, "dump", () => arguments.WithKey("q", a =>
{
parameters.QueueName = a.Value;
Dump(parameters);
}).FailWith(messsage("No Queue Name given")));
arguments.At(0, "insert", () => arguments.WithKey("q", a =>
{
parameters.QueueName = a.Value;
Insert(parameters);
}).FailWith(messsage("No Queue Name given")));
arguments.At(0, "err", () => ErrorDump(parameters));
arguments.At(0, "retry", () => Retry(parameters));
arguments.At(0, "?", PrintUsage);
// print usage if there are no arguments
arguments.At(0, a => {}).FailWith(PrintUsage);
}
catch (EasyNetQHosepipeException easyNetQHosepipeException)
{
Console.WriteLine("Operation Failed:");
Console.WriteLine(easyNetQHosepipeException.Message);
}
if(!succeeded)
{
Console.WriteLine("Operation failed");
Console.Write(results.ToString());
Console.WriteLine();
PrintUsage();
}
}
private void Dump(QueueParameters parameters)
{
var count = 0;
messageWriter.Write(WithEach(queueRetreival.GetMessagesFromQueue(parameters), () => count++), parameters);
Console.WriteLine("{0} Messages from queue '{1}'\r\noutput to directory '{2}'",
count, parameters.QueueName, parameters.MessageFilePath);
}
private void Insert(QueueParameters parameters)
{
var count = 0;
queueInsertion.PublishMessagesToQueue(
WithEach(messageReader.ReadMessages(parameters), () => count++), parameters);
Console.WriteLine("{0} Messages from directory '{1}'\r\ninserted into queue '{2}'",
count, parameters.MessageFilePath, parameters.QueueName);
}
private void ErrorDump(QueueParameters parameters)
{
parameters.QueueName = DefaultConsumerErrorStrategy.EasyNetQErrorQueue;
Dump(parameters);
}
private void Retry(QueueParameters parameters)
{
var count = 0;
errorRetry.RetryErrors(
WithEach(
messageReader.ReadMessages(parameters, DefaultConsumerErrorStrategy.EasyNetQErrorQueue),
() => count++),
parameters);
Console.WriteLine("{0} Error messages from directory '{1}' republished",
count, parameters.MessageFilePath);
}
private IEnumerable<string> WithEach(IEnumerable<string> messages, Action action)
{
foreach (var message in messages)
{
action();
yield return message;
}
}
public static void PrintUsage()
{
using (var manifest = Assembly.GetExecutingAssembly().GetManifestResourceStream("EasyNetQ.Hosepipe.Usage.txt"))
{
if(manifest == null)
{
throw new Exception("Could not load usage");
}
using (var reader = new StreamReader(manifest))
{
Console.Write(reader.ReadToEnd());
}
}
}
}
}