Skip to content

Commit

Permalink
Add support for streaming logs for multiple modules at the same time (#…
Browse files Browse the repository at this point in the history
…1033)

* Stream all support

* Fix bug and optimize

* Cleanup

* Fix build

* Add/ fix tests

* More tests

* Fix tests

* Add comments

* Add comment

* Separate out LogRequestItem

* Make 8kb to a constant

* Move LogRequestItem to core

* Fix stylecop issues

* Add defaults to LogStreamRequest parameters

* Rebase and fix
  • Loading branch information
varunpuranik committed Apr 19, 2019
1 parent 880df46 commit 5ce1903
Show file tree
Hide file tree
Showing 16 changed files with 637 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs

public interface ILogsProcessor
{
Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter);
Task<IReadOnlyList<ModuleLogMessage>> GetMessages(string id, Stream stream, ModuleLogFilter filter);

Task<IReadOnlyList<string>> GetText(Stream stream, string moduleId, ModuleLogFilter filter);
Task<IReadOnlyList<string>> GetText(string id, Stream stream, ModuleLogFilter filter);

Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback);
Task ProcessLogsStream(string id, Stream stream, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public interface ILogsProvider
{
Task<byte[]> GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken);
Task<byte[]> GetLogs(string id, ModuleLogOptions logOptions, CancellationToken cancellationToken);

Task GetLogsStream(ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken);
Task GetLogsStream(string id, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken);

Task GetLogsStream(IList<(string id, ModuleLogOptions logOptions)> ids, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

public class LogRequestItem
{
public LogRequestItem(string id, ModuleLogFilter filter)
{
this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id));
this.Filter = filter ?? ModuleLogFilter.Empty;
}

[JsonProperty("id")]
public string Id { get; }

[JsonProperty("filter")]
public ModuleLogFilter Filter { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public LogsProcessor(ILogMessageParser logMessageParser)
this.materializer = this.system.Materializer();
}

public async Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter)
public async Task<IReadOnlyList<ModuleLogMessage>> GetMessages(string id, Stream stream, ModuleLogFilter filter)
{
Preconditions.CheckNotNull(stream, nameof(stream));
Preconditions.CheckNotNull(filter, nameof(filter));
Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId));
Preconditions.CheckNonWhiteSpace(id, nameof(id));

GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, moduleId));
GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id));
filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l));
filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text)));
IRunnableGraph<Task<IImmutableList<ModuleLogMessage>>> graph = graphBuilder.GetMaterializingGraph(m => (ModuleLogMessage)m);
Expand All @@ -57,17 +57,17 @@ public async Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, st
return result;
}

public async Task<IReadOnlyList<string>> GetText(Stream stream, string moduleId, ModuleLogFilter filter)
public async Task<IReadOnlyList<string>> GetText(string id, Stream stream, ModuleLogFilter filter)
{
Preconditions.CheckNotNull(stream, nameof(stream));
Preconditions.CheckNotNull(filter, nameof(filter));
Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId));
Preconditions.CheckNonWhiteSpace(id, nameof(id));

IRunnableGraph<Task<IImmutableList<string>>> GetGraph()
{
if (filter.Regex.HasValue || filter.LogLevel.HasValue)
{
GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, moduleId));
GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id));
filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l));
filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text)));
return graphBuilder.GetMaterializingGraph(m => m.FullText);
Expand All @@ -83,9 +83,8 @@ IRunnableGraph<Task<IImmutableList<string>>> GetGraph()
return result;
}

public async Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback)
public async Task ProcessLogsStream(string id, Stream stream, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback)
{
string id = logOptions.Id;
GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id));
logOptions.Filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l));
logOptions.Filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Storage;
Expand All @@ -21,27 +24,81 @@ public LogsProvider(IRuntimeInfoProvider runtimeInfoProvider, ILogsProcessor log
this.logsProcessor = Preconditions.CheckNotNull(logsProcessor, nameof(logsProcessor));
}

public async Task<byte[]> GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken)
public async Task<byte[]> GetLogs(string id, ModuleLogOptions logOptions, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(logOptions, nameof(logOptions));
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, false, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken);
Events.ReceivedStream(logOptions.Id);
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, false, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken);
Events.ReceivedStream(id);

byte[] logBytes = await this.GetProcessedLogs(logsStream, logOptions);
byte[] logBytes = await this.GetProcessedLogs(id, logsStream, logOptions);
return logBytes;
}

public async Task GetLogsStream(ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken)
// The id parameter is a regex. Logs for all modules that match this regex are processed.
public async Task GetLogsStream(string id, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(logOptions, nameof(logOptions));
Preconditions.CheckNotNull(callback, nameof(callback));

Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, true, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken);
Events.ReceivedStream(logOptions.Id);
IEnumerable<string> modules = (await this.runtimeInfoProvider.GetModules(cancellationToken))
.Select(m => m.Name);
ISet<string> matchingIds = GetMatchingIds(id, modules);
Events.StreamingLogs(matchingIds, logOptions);
IEnumerable<Task> streamingTasks = matchingIds.Select(i => this.GetLogsStreamInternal(i, logOptions, callback, cancellationToken));
await Task.WhenAll(streamingTasks);
}

await (NeedToProcessStream(logOptions)
? this.logsProcessor.ProcessLogsStream(logsStream, logOptions, callback)
: this.WriteLogsStreamToOutput(logOptions.Id, callback, logsStream, cancellationToken));
// The id parameter in the ids is a regex. Logs for all modules that match this regex are processed.
// If multiple id parameters match a module, the first one is considered.
public async Task GetLogsStream(IList<(string id, ModuleLogOptions logOptions)> ids, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(ids, nameof(ids));
Preconditions.CheckNotNull(callback, nameof(callback));

IList<string> modules = (await this.runtimeInfoProvider.GetModules(cancellationToken))
.Select(m => m.Name)
.ToList();

IDictionary<string, ModuleLogOptions> idsToProcess = GetIdsToProcess(ids, modules);
Events.StreamingLogs(idsToProcess);
IEnumerable<Task> streamingTasks = idsToProcess.Select(kvp => this.GetLogsStreamInternal(kvp.Key, kvp.Value, callback, cancellationToken));
await Task.WhenAll(streamingTasks);
}

internal static IDictionary<string, ModuleLogOptions> GetIdsToProcess(IList<(string id, ModuleLogOptions logOptions)> idList, IList<string> allIds)
{
var idsToProcess = new Dictionary<string, ModuleLogOptions>(StringComparer.OrdinalIgnoreCase);
foreach ((string regex, ModuleLogOptions logOptions) in idList)
{
ISet<string> ids = GetMatchingIds(regex, allIds);
if (ids.Count == 0)
{
Events.NoMatchingModule(regex, allIds);
}
else
{
foreach (string id in ids)
{
if (!idsToProcess.ContainsKey(id))
{
idsToProcess[id] = logOptions;
}
}
}
}

return idsToProcess;
}

internal static ISet<string> GetMatchingIds(string id, IEnumerable<string> ids)
{
if (!id.Equals(Constants.AllModulesIdentifier, StringComparison.OrdinalIgnoreCase))
{
var regex = new Regex(id, RegexOptions.IgnoreCase);
ids = ids.Where(m => regex.IsMatch(m));
}

return ids.ToImmutableHashSet();
}

internal static bool NeedToProcessStream(ModuleLogOptions logOptions) =>
Expand All @@ -50,6 +107,19 @@ public async Task GetLogsStream(ModuleLogOptions logOptions, Func<ArraySegment<b
|| logOptions.ContentEncoding != LogsContentEncoding.None
|| logOptions.ContentType != LogsContentType.Text;

internal async Task GetLogsStreamInternal(string id, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(logOptions, nameof(logOptions));
Preconditions.CheckNotNull(callback, nameof(callback));

Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, true, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken);
Events.ReceivedStream(id);

await (NeedToProcessStream(logOptions)
? this.logsProcessor.ProcessLogsStream(id, logsStream, logOptions, callback)
: this.WriteLogsStreamToOutput(id, callback, logsStream, cancellationToken));
}

static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) =>
contentEncoding == LogsContentEncoding.Gzip
? Compression.CompressToGzip(bytes)
Expand Down Expand Up @@ -85,23 +155,23 @@ async Task WriteLogsStreamToOutput(string id, Func<ArraySegment<byte>, Task> cal
}
}

async Task<byte[]> GetProcessedLogs(Stream logsStream, ModuleLogOptions logOptions)
async Task<byte[]> GetProcessedLogs(string id, Stream logsStream, ModuleLogOptions logOptions)
{
byte[] logBytes = await this.ProcessByContentType(logsStream, logOptions);
byte[] logBytes = await this.ProcessByContentType(id, logsStream, logOptions);
logBytes = ProcessByContentEncoding(logBytes, logOptions.ContentEncoding);
return logBytes;
}

async Task<byte[]> ProcessByContentType(Stream logsStream, ModuleLogOptions logOptions)
async Task<byte[]> ProcessByContentType(string id, Stream logsStream, ModuleLogOptions logOptions)
{
switch (logOptions.ContentType)
{
case LogsContentType.Json:
IEnumerable<ModuleLogMessage> logMessages = await this.logsProcessor.GetMessages(logsStream, logOptions.Id, logOptions.Filter);
IEnumerable<ModuleLogMessage> logMessages = await this.logsProcessor.GetMessages(id, logsStream, logOptions.Filter);
return logMessages.ToBytes();

default:
IEnumerable<string> logTexts = await this.logsProcessor.GetText(logsStream, logOptions.Id, logOptions.Filter);
IEnumerable<string> logTexts = await this.logsProcessor.GetText(id, logsStream, logOptions.Filter);
string logTextString = logTexts.Join(string.Empty);
return logTextString.ToBytes();
}
Expand All @@ -117,7 +187,9 @@ enum EventIds
StreamingCancelled = IdStart,
ErrorWhileStreaming,
ReceivedStream,
StreamingCompleted
StreamingCompleted,
StreamingLogs,
NoMatchingModule
}

public static void ErrorWhileProcessingStream(string id, Exception ex)
Expand All @@ -140,6 +212,23 @@ public static void StreamingCompleted(string id)
{
Log.LogInformation((int)EventIds.StreamingCompleted, $"Completed streaming logs for {id}");
}

public static void StreamingLogs(IDictionary<string, ModuleLogOptions> idsToProcess)
{
Log.LogDebug((int)EventIds.StreamingLogs, $"Streaming logs for {idsToProcess.ToJson()}");
}

public static void NoMatchingModule(string regex, IList<string> allIds)
{
string idsString = allIds.Join(", ");
Log.LogWarning((int)EventIds.NoMatchingModule, $"The regex {regex} in the log stream request did not match any of the modules - {idsString}");
}

internal static void StreamingLogs(ISet<string> ids, ModuleLogOptions logOptions)
{
string idsString = ids.Join(",");
Log.LogDebug((int)EventIds.StreamingLogs, $"Streaming logs for {idsString} with options {logOptions.ToJson()}");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs

public class ModuleLogOptions : IEquatable<ModuleLogOptions>
{
public ModuleLogOptions(string id, LogsContentEncoding contentEncoding, LogsContentType contentType, ModuleLogFilter filter)
public ModuleLogOptions(LogsContentEncoding contentEncoding, LogsContentType contentType, ModuleLogFilter filter)
{
this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id));
this.ContentEncoding = contentEncoding;
this.ContentType = contentType;
this.Filter = Preconditions.CheckNotNull(filter, nameof(filter));
}

public string Id { get; }

public LogsContentEncoding ContentEncoding { get; }

public LogsContentType ContentType { get; }
Expand All @@ -28,15 +25,13 @@ public override bool Equals(object obj)

public bool Equals(ModuleLogOptions other)
=> other != null &&
this.Id == other.Id &&
this.ContentEncoding == other.ContentEncoding &&
this.ContentType == other.ContentType &&
EqualityComparer<ModuleLogFilter>.Default.Equals(this.Filter, other.Filter);

public override int GetHashCode()
{
var hashCode = -1683996196;
hashCode = hashCode * -1521134295 + EqualityComparer<string>.Default.GetHashCode(this.Id);
hashCode = hashCode * -1521134295 + this.ContentEncoding.GetHashCode();
hashCode = hashCode * -1521134295 + this.ContentType.GetHashCode();
hashCode = hashCode * -1521134295 + EqualityComparer<ModuleLogFilter>.Default.GetHashCode(this.Filter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ async Task<IList<string>> GetModuleIds()
}
}

var moduleLogOptions = new ModuleLogOptions(payload.Encoding, payload.ContentType, payload.Filter);
IList<string> moduleIds = await GetModuleIds();
IEnumerable<Task> uploadTasks = moduleIds.Select(m => this.UploadLogs(payload.SasUrl, new ModuleLogOptions(m, payload.Encoding, payload.ContentType, payload.Filter), cancellationToken));
IEnumerable<Task> uploadTasks = moduleIds.Select(m => this.UploadLogs(payload.SasUrl, m, moduleLogOptions, cancellationToken));
await Task.WhenAll(uploadTasks);
return Option.None<object>();
}

async Task UploadLogs(string sasUrl, ModuleLogOptions moduleLogOptions, CancellationToken token)
async Task UploadLogs(string sasUrl, string id, ModuleLogOptions moduleLogOptions, CancellationToken token)
{
byte[] logBytes = await this.logsProvider.GetLogs(moduleLogOptions, token);
await this.logsUploader.Upload(sasUrl, moduleLogOptions.Id, logBytes, moduleLogOptions.ContentEncoding, moduleLogOptions.ContentType);
byte[] logBytes = await this.logsProvider.GetLogs(id, moduleLogOptions, token);
await this.logsUploader.Upload(sasUrl, id, logBytes, moduleLogOptions.ContentEncoding, moduleLogOptions.ContentType);
}
}
}
Loading

0 comments on commit 5ce1903

Please sign in to comment.