-
Notifications
You must be signed in to change notification settings - Fork 2
/
Program.cs
169 lines (144 loc) · 6.94 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
namespace NotMyFaultModule
{
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Runtime.Loader;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
using Prometheus;
class Program
{
static int counter;
private static readonly Counter CustomCounterMetric =
Metrics.CreateCounter("notmyfault_counter_total", "Cumulative counter, increment only every 500 msec", labelNames: new[] { "edge_device_id", "instance_id", "iothub_name", "module_id" } );
private static readonly Gauge CustomGaugeMetric = Metrics
.CreateGauge("notmyfault_gauge_current", "Gauges can have any numeric value and change arbitrarily, random number every sec", labelNames: new[] { "edge_device_id", "instance_id", "iothub_name", "module_id" });
private static readonly Summary CustomSummaryMetric = Metrics
.CreateSummary("notmyfault_summary_bytes", "Summaries track the trends in events over time (10 minutes by default).", labelNames: new[] { "edge_device_id", "instance_id", "iothub_name", "module_id" });
static void MyHandler(object sender, UnhandledExceptionEventArgs args)
{
Exception e = (Exception)args.ExceptionObject;
Console.WriteLine("MyHandler caught : " + e.Message);
Console.WriteLine("Runtime terminating: {0}", args.IsTerminating);
}
static void Main(string[] args)
{
var server = new MetricServer(9600);
server.Start();
AppDomain.CurrentDomain.UnhandledException += new UnhandledExceptionEventHandler(MyHandler);
Init().Wait();
// Wait until the app unloads or is cancelled
var cts = new CancellationTokenSource();
AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
string deviceId = Environment.GetEnvironmentVariable("IOTEDGE_DEVICEID");
string instanceNumber = Guid.NewGuid().ToString();
string iothubHostname = Environment.GetEnvironmentVariable("IOTEDGE_IOTHUBHOSTNAME");
string moduleId = Environment.GetEnvironmentVariable("IOTEDGE_MODULEID");
Task.Run(() =>
{
while (!cts.IsCancellationRequested)
{
Task.Delay(500);
CustomCounterMetric.WithLabels(deviceId, instanceNumber, iothubHostname, moduleId).Inc();
//if (CustomCounterMetric.Value % 50 == 0)
//{
// Console.WriteLine($"CustomCounterMetric is now {CustomCounterMetric.Value}");
//}
}
Console.WriteLine($"Exiting from metric loop");
});
Random r = new();
Task.Run(() =>
{
while (!cts.IsCancellationRequested)
{
Task.Delay(1000);
CustomGaugeMetric.WithLabels(deviceId, instanceNumber, iothubHostname, moduleId).Set(r.Next(0, 500));
}
});
Random randomSummary = new();
Task.Run(() =>
{
while (!cts.IsCancellationRequested)
{
Task.Delay(1000);
CustomSummaryMetric.WithLabels(deviceId, instanceNumber, iothubHostname, moduleId).Observe(randomSummary.NextDouble() * 1024 * 1024);
}
});
WhenCancelled(cts.Token).Wait();
}
/// <summary>
/// Handles cleanup operations when app is cancelled or unloads
/// </summary>
public static Task WhenCancelled(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
return tcs.Task;
}
/// <summary>
/// Initializes the ModuleClient and sets up the callback to receive
/// messages containing temperature information
/// </summary>
static async Task Init()
{
MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
ITransportSettings[] settings = { mqttSetting };
// Open a connection to the Edge runtime
ModuleClient ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
await ioTHubModuleClient.OpenAsync();
Console.WriteLine("IoT Hub module client initialized.");
// Register callback to be called when a message is received by the module
await ioTHubModuleClient.SetInputMessageHandlerAsync("input1", PipeMessage, ioTHubModuleClient);
await ioTHubModuleClient.SetMethodHandlerAsync("terminate", FaultMethodInvoked, ioTHubModuleClient);
}
private async static Task<MethodResponse> FaultMethodInvoked(MethodRequest methodRequest, object userContext)
{
Task.Run(() => FaultMethodImplementation());
Console.WriteLine("method invoked");
return new MethodResponse(200);
}
private async static void FaultMethodImplementation()
{
await Task.Delay(5000);
System.Environment.Exit(-1);
}
/// <summary>
/// This method is called whenever the module is sent a message from the EdgeHub.
/// It just pipe the messages without any change.
/// It prints all the incoming messages.
/// </summary>
static async Task<MessageResponse> PipeMessage(Message message, object userContext)
{
int counterValue = Interlocked.Increment(ref counter);
var moduleClient = userContext as ModuleClient;
if (moduleClient == null)
{
throw new InvalidOperationException("UserContext doesn't contain " + "expected values");
}
byte[] messageBytes = message.GetBytes();
string messageString = Encoding.UTF8.GetString(messageBytes);
Console.WriteLine($"Received message: {counterValue}, Body: [{messageString}]");
if (!string.IsNullOrEmpty(messageString))
{
using (var pipeMessage = new Message(messageBytes))
{
foreach (var prop in message.Properties)
{
pipeMessage.Properties.Add(prop.Key, prop.Value);
}
await moduleClient.SendEventAsync("output1", pipeMessage);
Console.WriteLine("Received message sent");
}
}
return MessageResponse.Completed;
}
}
}