-
Notifications
You must be signed in to change notification settings - Fork 97
/
Test0.cs
87 lines (77 loc) · 3.51 KB
/
Test0.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using System;
using System.Data;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Text;
using System.Collections.Generic;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.EventHubs;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Dapper;
using Microsoft.Azure.EventHubs.Processor;
namespace StreamingProcessor
{
public static class Test0
{
[FunctionName("Test0")]
public static async Task RunAsync(
[EventHubTrigger("%EventHubName%", Connection = "EventHubsConnectionString", ConsumerGroup = "%ConsumerGroup%")] EventData[] eventHubData,
PartitionContext partitionContext,
ILogger log)
{
var procedureName = Environment.GetEnvironmentVariable("AzureSQLProcedureName");
var payloadName = "payloadType" + (procedureName.EndsWith("_mo") ? "_mo" : "");
var payload = new DataTable(payloadName);
payload.Columns.Add("EventId", typeof(string));
payload.Columns.Add("ComplexData", typeof(string));
payload.Columns.Add("Value", typeof(decimal));
payload.Columns.Add("DeviceId", typeof(string));
payload.Columns.Add("DeviceSequenceNumber", typeof(long));
payload.Columns.Add("Type", typeof(string));
payload.Columns.Add("CreatedAt", typeof(string));
payload.Columns.Add("EnqueuedAt", typeof(DateTime));
payload.Columns.Add("ProcessedAt", typeof(DateTime));
payload.Columns.Add("PartitionId", typeof(int));
Stopwatch sw = new Stopwatch();
sw.Start();
foreach (var data in eventHubData)
{
string message = Encoding.UTF8.GetString(data.Body.Array);
var json = JsonConvert.DeserializeObject<JObject>(message, new JsonSerializerSettings() { DateParseHandling = DateParseHandling.None } );
payload.Rows.Add(
json["eventId"].ToString(),
JsonConvert.SerializeObject(JObject.Parse(json["complexData"].ToString()), Formatting.None),
decimal.Parse(json["value"].ToString()),
json["deviceId"].ToString(),
json["deviceSequenceNumber"],
json["type"].ToString(),
json["createdAt"].ToString(),
data.SystemProperties.EnqueuedTimeUtc,
DateTime.UtcNow,
partitionContext.RuntimeInformation.PartitionId
);
}
try
{
var conn = new SqlConnection(Environment.GetEnvironmentVariable("AzureSQLConnectionString"));
await conn.ExecuteAsync(procedureName, new { @payload = payload.AsTableValuedParameter() }, commandType: CommandType.StoredProcedure);
}
catch (Exception ex)
{
// Retry and/or manage failure, for simplicity just logging the error now
log.LogError($"{ex} - {ex.Message}");
}
sw.Stop();
string logMessage = $"[Test0] T:{eventHubData.Length} doc - E:{sw.ElapsedMilliseconds} msec";
if (eventHubData.Length > 0)
{
logMessage += $" - AVG:{(sw.ElapsedMilliseconds / eventHubData.Length):N3} msec";
}
log.LogInformation(logMessage);
}
}
}