diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs index d0337a6511b..1680b74f119 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs @@ -8,10 +8,10 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs public interface ILogsProcessor { - Task> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter); + Task> GetMessages(string id, Stream stream, ModuleLogFilter filter); - Task> GetText(Stream stream, string moduleId, ModuleLogFilter filter); + Task> GetText(string id, Stream stream, ModuleLogFilter filter); - Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func, Task> callback); + Task ProcessLogsStream(string id, Stream stream, ModuleLogOptions logOptions, Func, Task> callback); } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProvider.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProvider.cs index 9f7997a84f3..e385ade3d25 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProvider.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProvider.cs @@ -2,13 +2,16 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public interface ILogsProvider { - Task GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken); + Task GetLogs(string id, ModuleLogOptions logOptions, CancellationToken cancellationToken); - Task GetLogsStream(ModuleLogOptions logOptions, Func, Task> callback, CancellationToken cancellationToken); + Task GetLogsStream(string id, ModuleLogOptions logOptions, Func, Task> callback, CancellationToken cancellationToken); + + Task GetLogsStream(IList<(string id, ModuleLogOptions logOptions)> ids, Func, Task> callback, CancellationToken cancellationToken); } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogRequestItem.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogRequestItem.cs new file mode 100644 index 00000000000..3aa0b14d84f --- /dev/null +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogRequestItem.cs @@ -0,0 +1,21 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs +{ + using Microsoft.Azure.Devices.Edge.Util; + using Newtonsoft.Json; + + public class LogRequestItem + { + public LogRequestItem(string id, ModuleLogFilter filter) + { + this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id)); + this.Filter = filter ?? ModuleLogFilter.Empty; + } + + [JsonProperty("id")] + public string Id { get; } + + [JsonProperty("filter")] + public ModuleLogFilter Filter { get; } + } +} diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs index 2c799c8a7be..12b6f0e0af7 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs @@ -42,13 +42,13 @@ public LogsProcessor(ILogMessageParser logMessageParser) this.materializer = this.system.Materializer(); } - public async Task> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter) + public async Task> GetMessages(string id, Stream stream, ModuleLogFilter filter) { Preconditions.CheckNotNull(stream, nameof(stream)); Preconditions.CheckNotNull(filter, nameof(filter)); - Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId)); + Preconditions.CheckNonWhiteSpace(id, nameof(id)); - GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, moduleId)); + GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id)); filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l)); filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text))); IRunnableGraph>> graph = graphBuilder.GetMaterializingGraph(m => (ModuleLogMessage)m); @@ -57,17 +57,17 @@ public async Task> GetMessages(Stream stream, st return result; } - public async Task> GetText(Stream stream, string moduleId, ModuleLogFilter filter) + public async Task> GetText(string id, Stream stream, ModuleLogFilter filter) { Preconditions.CheckNotNull(stream, nameof(stream)); Preconditions.CheckNotNull(filter, nameof(filter)); - Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId)); + Preconditions.CheckNonWhiteSpace(id, nameof(id)); IRunnableGraph>> GetGraph() { if (filter.Regex.HasValue || filter.LogLevel.HasValue) { - GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, moduleId)); + GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id)); filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l)); filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text))); return graphBuilder.GetMaterializingGraph(m => m.FullText); @@ -83,9 +83,8 @@ IRunnableGraph>> GetGraph() return result; } - public async Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func, Task> callback) + public async Task ProcessLogsStream(string id, Stream stream, ModuleLogOptions logOptions, Func, Task> callback) { - string id = logOptions.Id; GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id)); logOptions.Filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l)); logOptions.Filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text))); diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs index ccab92e1047..7e9c985f70b 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs @@ -3,7 +3,10 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs { using System; using System.Collections.Generic; + using System.Collections.Immutable; using System.IO; + using System.Linq; + using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Devices.Edge.Storage; @@ -21,27 +24,81 @@ public LogsProvider(IRuntimeInfoProvider runtimeInfoProvider, ILogsProcessor log this.logsProcessor = Preconditions.CheckNotNull(logsProcessor, nameof(logsProcessor)); } - public async Task GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken) + public async Task GetLogs(string id, ModuleLogOptions logOptions, CancellationToken cancellationToken) { Preconditions.CheckNotNull(logOptions, nameof(logOptions)); - Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, false, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken); - Events.ReceivedStream(logOptions.Id); + Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, false, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken); + Events.ReceivedStream(id); - byte[] logBytes = await this.GetProcessedLogs(logsStream, logOptions); + byte[] logBytes = await this.GetProcessedLogs(id, logsStream, logOptions); return logBytes; } - public async Task GetLogsStream(ModuleLogOptions logOptions, Func, Task> callback, CancellationToken cancellationToken) + // The id parameter is a regex. Logs for all modules that match this regex are processed. + public async Task GetLogsStream(string id, ModuleLogOptions logOptions, Func, Task> callback, CancellationToken cancellationToken) { Preconditions.CheckNotNull(logOptions, nameof(logOptions)); Preconditions.CheckNotNull(callback, nameof(callback)); - Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, true, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken); - Events.ReceivedStream(logOptions.Id); + IEnumerable modules = (await this.runtimeInfoProvider.GetModules(cancellationToken)) + .Select(m => m.Name); + ISet matchingIds = GetMatchingIds(id, modules); + Events.StreamingLogs(matchingIds, logOptions); + IEnumerable streamingTasks = matchingIds.Select(i => this.GetLogsStreamInternal(i, logOptions, callback, cancellationToken)); + await Task.WhenAll(streamingTasks); + } - await (NeedToProcessStream(logOptions) - ? this.logsProcessor.ProcessLogsStream(logsStream, logOptions, callback) - : this.WriteLogsStreamToOutput(logOptions.Id, callback, logsStream, cancellationToken)); + // The id parameter in the ids is a regex. Logs for all modules that match this regex are processed. + // If multiple id parameters match a module, the first one is considered. + public async Task GetLogsStream(IList<(string id, ModuleLogOptions logOptions)> ids, Func, Task> callback, CancellationToken cancellationToken) + { + Preconditions.CheckNotNull(ids, nameof(ids)); + Preconditions.CheckNotNull(callback, nameof(callback)); + + IList modules = (await this.runtimeInfoProvider.GetModules(cancellationToken)) + .Select(m => m.Name) + .ToList(); + + IDictionary idsToProcess = GetIdsToProcess(ids, modules); + Events.StreamingLogs(idsToProcess); + IEnumerable streamingTasks = idsToProcess.Select(kvp => this.GetLogsStreamInternal(kvp.Key, kvp.Value, callback, cancellationToken)); + await Task.WhenAll(streamingTasks); + } + + internal static IDictionary GetIdsToProcess(IList<(string id, ModuleLogOptions logOptions)> idList, IList allIds) + { + var idsToProcess = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach ((string regex, ModuleLogOptions logOptions) in idList) + { + ISet ids = GetMatchingIds(regex, allIds); + if (ids.Count == 0) + { + Events.NoMatchingModule(regex, allIds); + } + else + { + foreach (string id in ids) + { + if (!idsToProcess.ContainsKey(id)) + { + idsToProcess[id] = logOptions; + } + } + } + } + + return idsToProcess; + } + + internal static ISet GetMatchingIds(string id, IEnumerable ids) + { + if (!id.Equals(Constants.AllModulesIdentifier, StringComparison.OrdinalIgnoreCase)) + { + var regex = new Regex(id, RegexOptions.IgnoreCase); + ids = ids.Where(m => regex.IsMatch(m)); + } + + return ids.ToImmutableHashSet(); } internal static bool NeedToProcessStream(ModuleLogOptions logOptions) => @@ -50,6 +107,19 @@ internal static bool NeedToProcessStream(ModuleLogOptions logOptions) => || logOptions.ContentEncoding != LogsContentEncoding.None || logOptions.ContentType != LogsContentType.Text; + internal async Task GetLogsStreamInternal(string id, ModuleLogOptions logOptions, Func, Task> callback, CancellationToken cancellationToken) + { + Preconditions.CheckNotNull(logOptions, nameof(logOptions)); + Preconditions.CheckNotNull(callback, nameof(callback)); + + Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, true, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken); + Events.ReceivedStream(id); + + await (NeedToProcessStream(logOptions) + ? this.logsProcessor.ProcessLogsStream(id, logsStream, logOptions, callback) + : this.WriteLogsStreamToOutput(id, callback, logsStream, cancellationToken)); + } + static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) => contentEncoding == LogsContentEncoding.Gzip ? Compression.CompressToGzip(bytes) @@ -85,23 +155,23 @@ async Task WriteLogsStreamToOutput(string id, Func, Task> cal } } - async Task GetProcessedLogs(Stream logsStream, ModuleLogOptions logOptions) + async Task GetProcessedLogs(string id, Stream logsStream, ModuleLogOptions logOptions) { - byte[] logBytes = await this.ProcessByContentType(logsStream, logOptions); + byte[] logBytes = await this.ProcessByContentType(id, logsStream, logOptions); logBytes = ProcessByContentEncoding(logBytes, logOptions.ContentEncoding); return logBytes; } - async Task ProcessByContentType(Stream logsStream, ModuleLogOptions logOptions) + async Task ProcessByContentType(string id, Stream logsStream, ModuleLogOptions logOptions) { switch (logOptions.ContentType) { case LogsContentType.Json: - IEnumerable logMessages = await this.logsProcessor.GetMessages(logsStream, logOptions.Id, logOptions.Filter); + IEnumerable logMessages = await this.logsProcessor.GetMessages(id, logsStream, logOptions.Filter); return logMessages.ToBytes(); default: - IEnumerable logTexts = await this.logsProcessor.GetText(logsStream, logOptions.Id, logOptions.Filter); + IEnumerable logTexts = await this.logsProcessor.GetText(id, logsStream, logOptions.Filter); string logTextString = logTexts.Join(string.Empty); return logTextString.ToBytes(); } @@ -117,7 +187,9 @@ enum EventIds StreamingCancelled = IdStart, ErrorWhileStreaming, ReceivedStream, - StreamingCompleted + StreamingCompleted, + StreamingLogs, + NoMatchingModule } public static void ErrorWhileProcessingStream(string id, Exception ex) @@ -140,6 +212,23 @@ public static void StreamingCompleted(string id) { Log.LogInformation((int)EventIds.StreamingCompleted, $"Completed streaming logs for {id}"); } + + public static void StreamingLogs(IDictionary idsToProcess) + { + Log.LogDebug((int)EventIds.StreamingLogs, $"Streaming logs for {idsToProcess.ToJson()}"); + } + + public static void NoMatchingModule(string regex, IList allIds) + { + string idsString = allIds.Join(", "); + Log.LogWarning((int)EventIds.NoMatchingModule, $"The regex {regex} in the log stream request did not match any of the modules - {idsString}"); + } + + internal static void StreamingLogs(ISet ids, ModuleLogOptions logOptions) + { + string idsString = ids.Join(","); + Log.LogDebug((int)EventIds.StreamingLogs, $"Streaming logs for {idsString} with options {logOptions.ToJson()}"); + } } } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogOptions.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogOptions.cs index 9d6ab46bffb..c05c6a9a0a9 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogOptions.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogOptions.cs @@ -7,16 +7,13 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs public class ModuleLogOptions : IEquatable { - public ModuleLogOptions(string id, LogsContentEncoding contentEncoding, LogsContentType contentType, ModuleLogFilter filter) + public ModuleLogOptions(LogsContentEncoding contentEncoding, LogsContentType contentType, ModuleLogFilter filter) { - this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id)); this.ContentEncoding = contentEncoding; this.ContentType = contentType; this.Filter = Preconditions.CheckNotNull(filter, nameof(filter)); } - public string Id { get; } - public LogsContentEncoding ContentEncoding { get; } public LogsContentType ContentType { get; } @@ -28,7 +25,6 @@ public override bool Equals(object obj) public bool Equals(ModuleLogOptions other) => other != null && - this.Id == other.Id && this.ContentEncoding == other.ContentEncoding && this.ContentType == other.ContentType && EqualityComparer.Default.Equals(this.Filter, other.Filter); @@ -36,7 +32,6 @@ public bool Equals(ModuleLogOptions other) public override int GetHashCode() { var hashCode = -1683996196; - hashCode = hashCode * -1521134295 + EqualityComparer.Default.GetHashCode(this.Id); hashCode = hashCode * -1521134295 + this.ContentEncoding.GetHashCode(); hashCode = hashCode * -1521134295 + this.ContentType.GetHashCode(); hashCode = hashCode * -1521134295 + EqualityComparer.Default.GetHashCode(this.Filter); diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/requests/LogsUploadRequestHandler.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/requests/LogsUploadRequestHandler.cs index 2fcea1f8282..e2e331451ee 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/requests/LogsUploadRequestHandler.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/requests/LogsUploadRequestHandler.cs @@ -41,16 +41,17 @@ async Task> GetModuleIds() } } + var moduleLogOptions = new ModuleLogOptions(payload.Encoding, payload.ContentType, payload.Filter); IList moduleIds = await GetModuleIds(); - IEnumerable uploadTasks = moduleIds.Select(m => this.UploadLogs(payload.SasUrl, new ModuleLogOptions(m, payload.Encoding, payload.ContentType, payload.Filter), cancellationToken)); + IEnumerable uploadTasks = moduleIds.Select(m => this.UploadLogs(payload.SasUrl, m, moduleLogOptions, cancellationToken)); await Task.WhenAll(uploadTasks); return Option.None(); } - async Task UploadLogs(string sasUrl, ModuleLogOptions moduleLogOptions, CancellationToken token) + async Task UploadLogs(string sasUrl, string id, ModuleLogOptions moduleLogOptions, CancellationToken token) { - byte[] logBytes = await this.logsProvider.GetLogs(moduleLogOptions, token); - await this.logsUploader.Upload(sasUrl, moduleLogOptions.Id, logBytes, moduleLogOptions.ContentEncoding, moduleLogOptions.ContentType); + byte[] logBytes = await this.logsProvider.GetLogs(id, moduleLogOptions, token); + await this.logsUploader.Upload(sasUrl, id, logBytes, moduleLogOptions.ContentEncoding, moduleLogOptions.ContentType); } } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/EdgeClientWebSocket.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/EdgeClientWebSocket.cs index e099362cbd2..1e4f5de6c67 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/EdgeClientWebSocket.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/EdgeClientWebSocket.cs @@ -6,14 +6,18 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Edge.Util.Concurrency; class EdgeClientWebSocket : IClientWebSocket { readonly ClientWebSocket clientWebSocket; + // TODO: Check if locking is necessary here and if separate locks can be used for reading and writing. + readonly AsyncLock clientWebSocketLock; EdgeClientWebSocket(ClientWebSocket clientWebSocket) { this.clientWebSocket = Preconditions.CheckNotNull(clientWebSocket, nameof(clientWebSocket)); + this.clientWebSocketLock = new AsyncLock(); } public static async Task Connect(Uri url, string authToken, CancellationToken cancellationToken) @@ -26,13 +30,28 @@ public static async Task Connect(Uri url, string authToken, Ca public WebSocketState State => this.clientWebSocket.State; - public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) - => this.clientWebSocket.CloseAsync(closeStatus, statusDescription, cancellationToken); + public async Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) + { + using (await this.clientWebSocketLock.LockAsync(cancellationToken)) + { + await this.clientWebSocket.CloseAsync(closeStatus, statusDescription, cancellationToken); + } + } - public Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellationToken) - => this.clientWebSocket.ReceiveAsync(buffer, cancellationToken); + public async Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellationToken) + { + using (await this.clientWebSocketLock.LockAsync(cancellationToken)) + { + return await this.clientWebSocket.ReceiveAsync(buffer, cancellationToken); + } + } - public Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) - => this.clientWebSocket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); + public async Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) + { + using (await this.clientWebSocketLock.LockAsync(cancellationToken)) + { + await this.clientWebSocket.SendAsync(buffer, messageType, endOfMessage, cancellationToken); + } + } } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs index 76393117b77..6e9acdb0ffa 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs @@ -1,27 +1,35 @@ // Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub.Stream { + using System.Collections.Generic; using System.ComponentModel; using Microsoft.Azure.Devices.Edge.Agent.Core.Logs; using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Edge.Util.Json; using Newtonsoft.Json; public class LogsStreamRequest { - public LogsStreamRequest(string schemaVersion, string id, LogsContentEncoding encoding, LogsContentType contentType, ModuleLogFilter filter) + public LogsStreamRequest(string schemaVersion, List items, LogsContentEncoding encoding, LogsContentType contentType) { this.SchemaVersion = schemaVersion; - this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id)); - this.Filter = filter ?? ModuleLogFilter.Empty; + this.Items = Preconditions.CheckNotNull(items, nameof(items)); this.Encoding = encoding; this.ContentType = contentType; } + [JsonConstructor] + LogsStreamRequest(string schemaVersion, List items, LogsContentEncoding? encoding, LogsContentType? contentType) + : this(schemaVersion, items, encoding ?? LogsContentEncoding.None, contentType ?? LogsContentType.Text) + { + } + [JsonProperty("schemaVersion")] public string SchemaVersion { get; } - [JsonProperty("id")] - public string Id { get; } + [JsonProperty("items")] + [JsonConverter(typeof(SingleOrArrayConverter))] + public List Items { get; } [JsonProperty("encoding", DefaultValueHandling = DefaultValueHandling.Populate)] [DefaultValue(LogsContentEncoding.None)] @@ -30,8 +38,5 @@ public LogsStreamRequest(string schemaVersion, string id, LogsContentEncoding en [JsonProperty("contentType", DefaultValueHandling = DefaultValueHandling.Populate)] [DefaultValue(LogsContentType.Text)] public LogsContentType ContentType { get; } - - [JsonProperty("filter")] - public ModuleLogFilter Filter { get; } } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs index ce759ac3e3e..d4730c887fd 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs @@ -2,6 +2,8 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub.Stream { using System; + using System.Collections.Generic; + using System.Linq; using System.Net.WebSockets; using System.Text; using System.Threading; @@ -14,6 +16,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub.Stream public class LogsStreamRequestHandler : IStreamRequestHandler { + const int MaxLogRequestSizeBytes = 8192; // 8kb readonly ILogsProvider logsProvider; public LogsStreamRequestHandler(ILogsProvider logsProvider) @@ -28,14 +31,13 @@ public async Task Handle(IClientWebSocket clientWebSocket, CancellationToken can LogsStreamRequest streamRequest = await this.ReadLogsStreamingRequest(clientWebSocket, cancellationToken); Events.RequestData(streamRequest); - var logOptions = new ModuleLogOptions(streamRequest.Id, streamRequest.Encoding, streamRequest.ContentType, streamRequest.Filter); var socketCancellationTokenSource = new CancellationTokenSource(); Task ProcessLogsFrame(ArraySegment bytes) { if (clientWebSocket.State != WebSocketState.Open) { - Events.WebSocketNotOpen(streamRequest.Id, clientWebSocket.State); + Events.WebSocketNotOpen(streamRequest, clientWebSocket.State); socketCancellationTokenSource.Cancel(); return Task.CompletedTask; } @@ -45,12 +47,14 @@ Task ProcessLogsFrame(ArraySegment bytes) } } + IList<(string id, ModuleLogOptions logOptions)> logOptionsList = streamRequest.Items.Select(i => (i.Id, new ModuleLogOptions(streamRequest.Encoding, streamRequest.ContentType, i.Filter))).ToList(); + using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, socketCancellationTokenSource.Token)) { - await this.logsProvider.GetLogsStream(logOptions, ProcessLogsFrame, linkedCts.Token); + await this.logsProvider.GetLogsStream(logOptionsList, ProcessLogsFrame, linkedCts.Token); } - Events.StreamingCompleted(streamRequest.Id); + Events.StreamingCompleted(streamRequest); } catch (Exception e) { @@ -60,7 +64,8 @@ Task ProcessLogsFrame(ArraySegment bytes) async Task ReadLogsStreamingRequest(IClientWebSocket clientWebSocket, CancellationToken cancellationToken) { - var buf = new byte[1024]; + // Max size of the request can be 8k + var buf = new byte[MaxLogRequestSizeBytes]; var arrSeg = new ArraySegment(buf); WebSocketReceiveResult result = await clientWebSocket.ReceiveAsync(arrSeg, cancellationToken); if (result.Count > 0) @@ -95,15 +100,19 @@ public static void RequestData(LogsStreamRequest streamRequest) Log.LogInformation((int)EventIds.RequestData, $"Logs streaming request data - {streamRequest.ToJson()}"); } - public static void StreamingCompleted(string id) + public static void StreamingCompleted(LogsStreamRequest logStreamRequest) { + string id = GetIds(logStreamRequest); Log.LogInformation((int)EventIds.StreamingCompleted, $"Completed streaming logs for {id}"); } - public static void WebSocketNotOpen(string id, WebSocketState state) + public static void WebSocketNotOpen(LogsStreamRequest logStreamRequest, WebSocketState state) { + string id = GetIds(logStreamRequest); Log.LogInformation((int)EventIds.WebSocketNotOpen, $"Terminating streaming logs for {id} because WebSocket state is {state}"); } + + static string GetIds(LogsStreamRequest logStreamRequest) => logStreamRequest.Items.Select(i => i.Id).Join(","); } } } diff --git a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs index 4c3c7f43829..85ec56473e3 100644 --- a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs +++ b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs @@ -73,7 +73,7 @@ public async Task GetTextTest() var filter = ModuleLogFilter.Empty; // Act - IEnumerable textLines = await logsProcessor.GetText(stream, moduleId, filter); + IEnumerable textLines = await logsProcessor.GetText(moduleId, stream, filter); // Assert Assert.NotNull(textLines); @@ -93,7 +93,7 @@ public async Task GetMessagesTest() var filter = ModuleLogFilter.Empty; // Act - IEnumerable logMessages = await logsProcessor.GetMessages(stream, moduleId, filter); + IEnumerable logMessages = await logsProcessor.GetMessages(moduleId, stream, filter); // Assert Assert.NotNull(logMessages); @@ -129,7 +129,7 @@ public async Task GetTextWithRegexFilterTest() var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.None(), Option.Some(regex)); // Act - IEnumerable textLines = await logsProcessor.GetText(stream, moduleId, filter); + IEnumerable textLines = await logsProcessor.GetText(moduleId, stream, filter); // Assert Assert.NotNull(textLines); @@ -151,7 +151,7 @@ public async Task GetMessagesWithRegexFilterTest() var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.None(), Option.Some(regex)); // Act - IEnumerable logMessages = await logsProcessor.GetMessages(stream, moduleId, filter); + IEnumerable logMessages = await logsProcessor.GetMessages(moduleId, stream, filter); // Assert Assert.NotNull(logMessages); @@ -188,7 +188,7 @@ public async Task GetTextWithLogLevelFilterTest(int logLevel, List expected var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.None()); // Act - List textLines = (await logsProcessor.GetText(stream, moduleId, filter)).ToList(); + List textLines = (await logsProcessor.GetText(moduleId, stream, filter)).ToList(); // Assert Assert.NotNull(textLines); @@ -213,7 +213,7 @@ public async Task GetMessagesWithLogLevelFilterTest(int logLevel, List expe var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.None()); // Act - IEnumerable logMessages = await logsProcessor.GetMessages(stream, moduleId, filter); + IEnumerable logMessages = await logsProcessor.GetMessages(moduleId, stream, filter); // Assert Assert.NotNull(logMessages); @@ -241,7 +241,7 @@ public async Task GetTextWithMultipleFiltersTest(int logLevel, string regex, Lis var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); // Act - List textLines = (await logsProcessor.GetText(stream, moduleId, filter)).ToList(); + List textLines = (await logsProcessor.GetText(moduleId, stream, filter)).ToList(); // Assert Assert.NotNull(textLines); @@ -266,7 +266,7 @@ public async Task GetMessagesWithMultipleFiltersTest(int logLevel, string regex, var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); // Act - IEnumerable logMessages = await logsProcessor.GetMessages(stream, moduleId, filter); + IEnumerable logMessages = await logsProcessor.GetMessages(moduleId, stream, filter); // Assert Assert.NotNull(logMessages); @@ -293,7 +293,7 @@ public async Task ProcessStreamTest() var logsProcessor = new LogsProcessor(logMessageParser); var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts)); var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Text, filter); + var logOptions = new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, filter); var receivedBytes = new List(); @@ -304,7 +304,7 @@ Task Callback(ArraySegment bytes) } // Act - await logsProcessor.ProcessLogsStream(stream, logOptions, Callback); + await logsProcessor.ProcessLogsStream(moduleId, stream, logOptions, Callback); await Task.Delay(TimeSpan.FromSeconds(5)); // Assert @@ -329,7 +329,7 @@ public async Task ProcessStreamToMessageTest() var logsProcessor = new LogsProcessor(logMessageParser); var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts)); var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Json, filter); + var logOptions = new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Json, filter); var receivedBytes = new List(); @@ -340,7 +340,7 @@ Task Callback(ArraySegment bytes) } // Act - await logsProcessor.ProcessLogsStream(stream, logOptions, Callback); + await logsProcessor.ProcessLogsStream(moduleId, stream, logOptions, Callback); await Task.Delay(TimeSpan.FromSeconds(5)); // Assert @@ -366,7 +366,7 @@ public async Task ProcessStreamWithGzipTest() var logsProcessor = new LogsProcessor(logMessageParser); var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts)); var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.Gzip, LogsContentType.Text, filter); + var logOptions = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Text, filter); var receivedBytes = new List(); @@ -377,7 +377,7 @@ Task Callback(ArraySegment bytes) } // Act - await logsProcessor.ProcessLogsStream(stream, logOptions, Callback); + await logsProcessor.ProcessLogsStream(moduleId, stream, logOptions, Callback); await Task.Delay(TimeSpan.FromSeconds(5)); // Assert diff --git a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs index 05470885d8b..abc8741a882 100644 --- a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs +++ b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs @@ -27,6 +27,17 @@ public class LogsProviderTest "<3> 2019-05-08 02:23:23.137 +00:00 [ERR] - Something really bad happened.\n" }; + public static IEnumerable GetNeedToProcessStreamTestData() + { + yield return new object[] { new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty), false }; + yield return new object[] { new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.None())), false }; + yield return new object[] { new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Text, ModuleLogFilter.Empty), true }; + yield return new object[] { new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.Some(3), Option.Some("foo"))), true }; + yield return new object[] { new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.Some(3), Option.None())), true }; + yield return new object[] { new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.Some("foo"))), true }; + yield return new object[] { new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Json, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.None())), true }; + } + [Fact] public async Task GetLogsAsTextTest() { @@ -46,10 +57,10 @@ public async Task GetLogsAsTextTest() var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); + var logOptions = new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); // Act - byte[] bytes = await logsProvider.GetLogs(logOptions, cancellationToken); + byte[] bytes = await logsProvider.GetLogs(moduleId, logOptions, cancellationToken); // Assert string logsText = Encoding.UTF8.GetString(bytes); @@ -75,10 +86,10 @@ public async Task GetLogsAsTextWithCompressionTest() var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.Gzip, LogsContentType.Text, ModuleLogFilter.Empty); + var logOptions = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Text, ModuleLogFilter.Empty); // Act - byte[] bytes = await logsProvider.GetLogs(logOptions, cancellationToken); + byte[] bytes = await logsProvider.GetLogs(moduleId, logOptions, cancellationToken); // Assert byte[] decompressedBytes = Compression.DecompressFromGzip(bytes); @@ -104,10 +115,10 @@ public async Task GetLogsAsJsonTest() var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Json, ModuleLogFilter.Empty); + var logOptions = new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Json, ModuleLogFilter.Empty); // Act - byte[] bytes = await logsProvider.GetLogs(logOptions, cancellationToken); + byte[] bytes = await logsProvider.GetLogs(moduleId, logOptions, cancellationToken); // Assert var logMessages = bytes.FromBytes>(); @@ -145,10 +156,10 @@ public async Task GetLogsAsJsonWithCompressionTest() var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.Gzip, LogsContentType.Json, ModuleLogFilter.Empty); + var logOptions = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Json, ModuleLogFilter.Empty); // Act - byte[] bytes = await logsProvider.GetLogs(logOptions, cancellationToken); + byte[] bytes = await logsProvider.GetLogs(moduleId, logOptions, cancellationToken); // Assert byte[] decompressedBytes = Compression.DecompressFromGzip(bytes); @@ -184,11 +195,13 @@ public async Task GetLogsStreamTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, true, tail, since, cancellationToken)) .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes)); + runtimeInfoProvider.Setup(r => r.GetModules(It.IsAny())) + .ReturnsAsync(new[] { new ModuleRuntimeInfo(moduleId, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()) }); var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); + var logOptions = new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); var receivedBytes = new List(); @@ -199,7 +212,7 @@ Task Callback(ArraySegment bytes) } // Act - await logsProvider.GetLogsStream(logOptions, Callback, cancellationToken); + await logsProvider.GetLogsStream(moduleId, logOptions, Callback, cancellationToken); // Assert Assert.NotEmpty(receivedBytes); @@ -221,12 +234,14 @@ public async Task GetLogsStreamWithFiltersTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, true, tail, since, cancellationToken)) .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes)); + runtimeInfoProvider.Setup(r => r.GetModules(It.IsAny())) + .ReturnsAsync(new[] { new ModuleRuntimeInfo(moduleId, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()) }); var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); var filter = new ModuleLogFilter(tail, since, Option.Some(6), Option.Some("Starting")); - var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.Gzip, LogsContentType.Text, filter); + var logOptions = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Text, filter); var receivedBytes = new List(); @@ -237,7 +252,7 @@ Task Callback(ArraySegment bytes) } // Act - await logsProvider.GetLogsStream(logOptions, Callback, cancellationToken); + await logsProvider.GetLogsStream(moduleId, logOptions, Callback, cancellationToken); await Task.Delay(TimeSpan.FromSeconds(3)); // Assert @@ -249,6 +264,142 @@ Task Callback(ArraySegment bytes) Assert.Equal(TestLogTexts[0], receivedText); } + [Fact] + public async Task GetLogsStreamWithMultipleModulesTest() + { + // Arrange + string iotHub = "foo.azure-devices.net"; + string deviceId = "dev1"; + Option tail = Option.Some(10); + Option since = Option.Some(1552887267); + CancellationToken cancellationToken = CancellationToken.None; + + string moduleId1 = "mod1"; + string moduleId2 = "mod2"; + + var filter1 = new ModuleLogFilter(tail, since, Option.Some(6), Option.Some("Starting")); + var filter2 = new ModuleLogFilter(Option.None(), Option.None(), Option.None(), Option.Some("bad")); + + byte[] dockerLogsStreamBytes1 = DockerFraming.Frame(TestLogTexts); + byte[] dockerLogsStreamBytes2 = DockerFraming.Frame(TestLogTexts); + + var modulesList = new List + { + new ModuleRuntimeInfo(moduleId1, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()), + new ModuleRuntimeInfo(moduleId2, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()) + }; + + var runtimeInfoProvider = new Mock(); + runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId1, true, tail, since, cancellationToken)) + .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes1)); + runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId2, true, Option.None(), Option.None(), cancellationToken)) + .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes2)); + runtimeInfoProvider.Setup(r => r.GetModules(It.IsAny())) + .ReturnsAsync(modulesList); + + var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); + var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); + + var logOptions1 = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Text, filter1); + var logOptions2 = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Text, filter2); + var logIds = new List<(string, ModuleLogOptions)> { (moduleId1, logOptions1), (moduleId2, logOptions2) }; + + var receivedBytes = new List(); + Task Callback(ArraySegment bytes) + { + receivedBytes.Add(bytes.ToArray()); + return Task.CompletedTask; + } + + var expectedTextLines = new List { TestLogTexts[0], TestLogTexts[3], TestLogTexts[4] }; + expectedTextLines.Sort(); + + // Act + await logsProvider.GetLogsStream(logIds, Callback, cancellationToken); + await Task.Delay(TimeSpan.FromSeconds(6)); + + // Assert + Assert.NotEmpty(receivedBytes); + List receivedText = receivedBytes + .Select( + r => + Compression.DecompressFromGzip(r) + .Skip(8) + .ToArray() + .FromBytes()) + .ToList(); + receivedText.Sort(); + + Assert.Equal(expectedTextLines, receivedText); + } + + [Fact] + public async Task GetLogsStreamWithMultipleModulesWithRegexMatchTest() + { + // Arrange + string iotHub = "foo.azure-devices.net"; + string deviceId = "dev1"; + Option tail = Option.None(); + Option since = Option.None(); + CancellationToken cancellationToken = CancellationToken.None; + + string moduleId1 = "mod1"; + string moduleId2 = "mod2"; + + var filter = new ModuleLogFilter(tail, since, Option.None(), Option.Some("bad")); + + byte[] dockerLogsStreamBytes1 = DockerFraming.Frame(TestLogTexts); + byte[] dockerLogsStreamBytes2 = DockerFraming.Frame(TestLogTexts); + + var modulesList = new List + { + new ModuleRuntimeInfo(moduleId1, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()), + new ModuleRuntimeInfo(moduleId2, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()) + }; + + var runtimeInfoProvider = new Mock(); + runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId1, true, tail, since, cancellationToken)) + .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes1)); + runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId2, true, tail, since, cancellationToken)) + .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes2)); + runtimeInfoProvider.Setup(r => r.GetModules(It.IsAny())) + .ReturnsAsync(modulesList); + + var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); + var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); + + string regex = "mod"; + var logOptions = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Text, filter); + + var receivedBytes = new List(); + Task Callback(ArraySegment bytes) + { + receivedBytes.Add(bytes.ToArray()); + return Task.CompletedTask; + } + + var expectedTextLines = new List { TestLogTexts[3], TestLogTexts[4], TestLogTexts[3], TestLogTexts[4] }; + expectedTextLines.Sort(); + + // Act + await logsProvider.GetLogsStream(regex, logOptions, Callback, cancellationToken); + await Task.Delay(TimeSpan.FromSeconds(6)); + + // Assert + Assert.NotEmpty(receivedBytes); + List receivedText = receivedBytes + .Select( + r => + Compression.DecompressFromGzip(r) + .Skip(8) + .ToArray() + .FromBytes()) + .ToList(); + receivedText.Sort(); + + Assert.Equal(expectedTextLines, receivedText); + } + [Theory] [MemberData(nameof(GetNeedToProcessStreamTestData))] public void NeedToProcessStreamTest(ModuleLogOptions logOptions, bool expectedResult) @@ -256,15 +407,125 @@ public void NeedToProcessStreamTest(ModuleLogOptions logOptions, bool expectedRe Assert.Equal(expectedResult, LogsProvider.NeedToProcessStream(logOptions)); } - public static IEnumerable GetNeedToProcessStreamTestData() + [Theory] + [MemberData(nameof(GetMatchingIdsTestData))] + public void GetMatchingIdsTest(string regex, IList moduleIds, IList expectedList) + { + ISet actualModules = LogsProvider.GetMatchingIds(regex, moduleIds); + Assert.Equal(expectedList.OrderBy(i => i), actualModules.OrderBy(i => i)); + } + + [Theory] + [MemberData(nameof(GetIdsToProcessTestData))] + public void GetIdsToProcessTest(IList<(string id, ModuleLogOptions logOptions)> idList, IList allIds, IDictionary expectedIdsToProcess) + { + IDictionary idsToProcess = LogsProvider.GetIdsToProcess(idList, allIds); + Assert.Equal(expectedIdsToProcess, idsToProcess); + } + + public static IEnumerable GetIdsToProcessTestData() { - yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty), false }; - yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.None())), false }; - yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.Gzip, LogsContentType.Text, ModuleLogFilter.Empty), true }; - yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.Some(3), Option.Some("foo"))), true }; - yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.Some(3), Option.None())), true }; - yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.Some("foo"))), true }; - yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Json, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.None())), true }; + var logOptions1 = new ModuleLogOptions(LogsContentEncoding.Gzip, LogsContentType.Json, ModuleLogFilter.Empty); + var logOptions2 = new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); + var logOptions3 = new ModuleLogOptions(LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(100), Option.None(), Option.None(), Option.None())); + + yield return new object[] + { + new List<(string id, ModuleLogOptions logOptions)> + { + ("edgeAgent", logOptions1), + ("edgeHub", logOptions2), + ("tempSensor", logOptions3) + }, + new List { "edgeAgent", "edgeHub", "tempSensor", "eModule2" }, + new Dictionary + { + ["edgeAgent"] = logOptions1, + ["edgeHub"] = logOptions2, + ["tempSensor"] = logOptions3 + } + }; + + yield return new object[] + { + new List<(string id, ModuleLogOptions logOptions)> + { + ("edgeAgent", logOptions1), + ("edgeHub", logOptions2), + ("tempSensor", logOptions3) + }, + new List { "edgeAgent", "edgeHub", "tempSimulator", "eModule2" }, + new Dictionary + { + ["edgeAgent"] = logOptions1, + ["edgeHub"] = logOptions2 + } + }; + + yield return new object[] + { + new List<(string id, ModuleLogOptions logOptions)> + { + ("edge", logOptions1), + ("edgeHub", logOptions2), + ("e.*e", logOptions3) + }, + new List { "edgeAgent", "edgeHub", "module1", "eModule2" }, + new Dictionary + { + ["edgeAgent"] = logOptions1, + ["edgeHub"] = logOptions1, + ["eModule2"] = logOptions3 + } + }; + + yield return new object[] + { + new List<(string id, ModuleLogOptions logOptions)> + { + ("^e.*", logOptions1), + ("mod", logOptions2) + }, + new List { "edgeAgent", "edgeHub", "module1", "eModule2" }, + new Dictionary + { + ["edgeAgent"] = logOptions1, + ["edgeHub"] = logOptions1, + ["eModule2"] = logOptions1, + ["module1"] = logOptions2, + } + }; + } + + public static IEnumerable GetMatchingIdsTestData() + { + yield return new object[] + { + "edge", + new List { "edgehub", "edgeAgent", "module1", "edgMod2" }, + new List { "edgehub", "edgeAgent" }, + }; + + yield return new object[] + { + "e.*t", + new List { "edgehub", "edgeAgent", "module1", "eandt" }, + new List { "edgeAgent", "eandt" }, + }; + + yield return new object[] + { + "EDGE", + new List { "edgehub", "edgeAgent", "module1", "testmod3" }, + new List { "edgehub", "edgeAgent" }, + }; + + yield return new object[] + { + "^e.*", + new List { "edgehub", "edgeAgent", "module1", "eandt" }, + new List { "edgehub", "edgeAgent", "eandt" }, + }; } } } diff --git a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/requests/LogsUploadRequestHandlerTest.cs b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/requests/LogsUploadRequestHandlerTest.cs index 71081e1a1e7..98240b56525 100644 --- a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/requests/LogsUploadRequestHandlerTest.cs +++ b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/requests/LogsUploadRequestHandlerTest.cs @@ -51,8 +51,8 @@ public async Task TestLogsUploadRequest(string payload, string id, string sasUrl var logsUploader = new Mock(); var logsProvider = new Mock(); var uploadBytes = new byte[100]; - var moduleLogOptions = new ModuleLogOptions(id, contentEncoding, contentType, filter); - logsProvider.Setup(l => l.GetLogs(moduleLogOptions, It.IsAny())) + var moduleLogOptions = new ModuleLogOptions(contentEncoding, contentType, filter); + logsProvider.Setup(l => l.GetLogs(id, moduleLogOptions, It.IsAny())) .ReturnsAsync(uploadBytes); logsUploader.Setup(l => l.Upload(sasUrl, id, uploadBytes, contentEncoding, contentType)) .Returns(Task.CompletedTask); @@ -93,23 +93,23 @@ public async Task TestLogsUploadAllTaskRequest() var logsUploader = new Mock(); var logsProvider = new Mock(); - var module1LogOptions = new ModuleLogOptions(mod1, contentEncoding, contentType, filter); + var module1LogOptions = new ModuleLogOptions(contentEncoding, contentType, filter); var mod1LogBytes = new byte[100]; - logsProvider.Setup(l => l.GetLogs(module1LogOptions, It.IsAny())) + logsProvider.Setup(l => l.GetLogs(mod1, module1LogOptions, It.IsAny())) .ReturnsAsync(mod1LogBytes); logsUploader.Setup(l => l.Upload(sasUrl, mod1, mod1LogBytes, contentEncoding, contentType)) .Returns(Task.CompletedTask); - var module2LogOptions = new ModuleLogOptions(mod2, contentEncoding, contentType, filter); + var module2LogOptions = new ModuleLogOptions(contentEncoding, contentType, filter); var mod2LogBytes = new byte[80]; - logsProvider.Setup(l => l.GetLogs(module2LogOptions, It.IsAny())) + logsProvider.Setup(l => l.GetLogs(mod2, module2LogOptions, It.IsAny())) .ReturnsAsync(mod2LogBytes); logsUploader.Setup(l => l.Upload(sasUrl, mod2, mod2LogBytes, contentEncoding, contentType)) .Returns(Task.CompletedTask); - var module3LogOptions = new ModuleLogOptions(mod3, contentEncoding, contentType, filter); + var module3LogOptions = new ModuleLogOptions(contentEncoding, contentType, filter); var mod3LogBytes = new byte[120]; - logsProvider.Setup(l => l.GetLogs(module3LogOptions, It.IsAny())) + logsProvider.Setup(l => l.GetLogs(mod3, module3LogOptions, It.IsAny())) .ReturnsAsync(mod3LogBytes); logsUploader.Setup(l => l.Upload(sasUrl, mod3, mod3LogBytes, contentEncoding, contentType)) .Returns(Task.CompletedTask); diff --git a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs index 07570ba88bd..863f997753b 100644 --- a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs +++ b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs @@ -31,10 +31,12 @@ public async Task HandleTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(id, true, Option.None(), Option.None(), It.IsAny())) .ReturnsAsync(new MemoryStream(buffer)); + runtimeInfoProvider.Setup(r => r.GetModules(It.IsAny())) + .ReturnsAsync(new[] { new ModuleRuntimeInfo(id, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()) }); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, Mock.Of()); - - var logsStreamRequest = new LogsStreamRequest("1.0", id, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); + var logRequestItem = new LogRequestItem(id, ModuleLogFilter.Empty); + var logsStreamRequest = new LogsStreamRequest("1.0", new List { logRequestItem }, LogsContentEncoding.None, LogsContentType.Text); byte[] logsStreamRequestBytes = logsStreamRequest.ToBytes(); var logsStreamRequestArraySeg = new ArraySegment(logsStreamRequestBytes); var clientWebSocket = new Mock(); @@ -74,10 +76,12 @@ public async Task HandleWithCancellationTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(id, true, Option.None(), Option.None(), It.IsAny())) .ReturnsAsync(new MemoryStream(buffer)); + runtimeInfoProvider.Setup(r => r.GetModules(It.IsAny())) + .ReturnsAsync(new[] { new ModuleRuntimeInfo(id, "docker", ModuleStatus.Running, "foo", 0, Option.None(), Option.None()) }); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, Mock.Of()); - - var logsStreamRequest = new LogsStreamRequest("1.0", id, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); + var logRequestItem = new LogRequestItem(id, ModuleLogFilter.Empty); + var logsStreamRequest = new LogsStreamRequest("1.0", new List { logRequestItem }, LogsContentEncoding.None, LogsContentType.Text); byte[] logsStreamRequestBytes = logsStreamRequest.ToBytes(); var logsStreamRequestArraySeg = new ArraySegment(logsStreamRequestBytes); var clientWebSocket = new Mock(); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/json/SingleOrArrayConverter.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/json/SingleOrArrayConverter.cs new file mode 100644 index 00000000000..5e461dd2166 --- /dev/null +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/json/SingleOrArrayConverter.cs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Util.Json +{ + using System; + using System.Collections.Generic; + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + + public class SingleOrArrayConverter : JsonConverter + { + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + value = value is List list && list.Count == 1 + ? list[0] + : value; + serializer.Serialize(writer, value); + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + JToken token = JToken.Load(reader); + return token.Type == JTokenType.Array + ? token.ToObject>() + : new List { token.ToObject() }; + } + + public override bool CanConvert(Type objectType) => objectType == typeof(List); + } +} diff --git a/edge-util/test/Microsoft.Azure.Devices.Edge.Util.Test/json/SingleOrArrayConverterTest.cs b/edge-util/test/Microsoft.Azure.Devices.Edge.Util.Test/json/SingleOrArrayConverterTest.cs new file mode 100644 index 00000000000..47192ac9184 --- /dev/null +++ b/edge-util/test/Microsoft.Azure.Devices.Edge.Util.Test/json/SingleOrArrayConverterTest.cs @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft. All rights reserved. +namespace Microsoft.Azure.Devices.Edge.Util.Test.Json +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Microsoft.Azure.Devices.Edge.Util.Json; + using Microsoft.Azure.Devices.Edge.Util.Test.Common; + using Newtonsoft.Json; + using Xunit; + + [Unit] + public class SingleOrArrayConverterTest + { + public static IEnumerable GetRoundtripData() + { + yield return new object[] + { + new TestClass( + new List { "foo" }, + new List { new PropertyClass("Bar") }), + "{\"items\":\"foo\",\"propertyItems\":{\"testProp\":\"Bar\"}}" + }; + + yield return new object[] + { + new TestClass( + new List { "foo", "foo2" }, + new List { new PropertyClass("Bar"), new PropertyClass("Bar2") }), + "{\"items\":[\"foo\",\"foo2\"],\"propertyItems\":[{\"testProp\":\"Bar\"},{\"testProp\":\"Bar2\"}]}" + }; + } + + [Theory] + [MemberData(nameof(GetRoundtripData))] + public void RoundtripTest(TestClass testObj, string expectedJson) + { + // Act + string json = JsonConvert.SerializeObject(testObj); + + // Assert + Assert.Equal(expectedJson, json); + + // Act + TestClass deserializedObject = JsonConvert.DeserializeObject(json); + + // Assert + Assert.Equal(testObj, deserializedObject); + } + + public class PropertyClass : IEquatable + { + public PropertyClass(string testProp) + { + this.TestProp = testProp; + } + + [JsonProperty("testProp")] + public string TestProp { get; } + + public override bool Equals(object obj) => this.Equals(obj as PropertyClass); + + public bool Equals(PropertyClass other) => other != null && + this.TestProp == other.TestProp; + + public override int GetHashCode() => HashCode.Combine(this.TestProp); + } + + public class TestClass : IEquatable + { + public TestClass(List items, List propertyItems) + { + this.Items = items; + this.PropertyItems = propertyItems; + } + + [JsonProperty("items")] + [JsonConverter(typeof(SingleOrArrayConverter))] + public List Items { get; } + + [JsonProperty("propertyItems")] + [JsonConverter(typeof(SingleOrArrayConverter))] + public List PropertyItems { get; } + + public override bool Equals(object obj) + => this.Equals(obj as TestClass); + + public bool Equals(TestClass other) + => other != null && + !this.Items.Except(other.Items).Any() && + !this.PropertyItems.Except(other.PropertyItems).Any(); + + public override int GetHashCode() => HashCode.Combine(this.Items, this.PropertyItems); + } + } +}