Skip to content

Commit

Permalink
Add filtering support for logs (#967)
Browse files Browse the repository at this point in the history
* Add filter support

* Add support for filter

* Add tests

* Add tests

* Fix build

* Fix build after merge
  • Loading branch information
varunpuranik committed Mar 22, 2019
1 parent e68f759 commit a8cdf8d
Show file tree
Hide file tree
Showing 16 changed files with 424 additions and 39 deletions.
Expand Up @@ -6,6 +6,6 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs

public interface ILogMessageParser
{
ModuleLogMessage Parse(ByteString byteString, string moduleId);
ModuleLogMessageData Parse(ByteString byteString, string moduleId);
}
}
Expand Up @@ -7,8 +7,8 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs

public interface ILogsProcessor
{
Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId);
Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter);

Task<IReadOnlyList<string>> GetText(Stream stream);
Task<IReadOnlyList<string>> GetText(Stream stream, string moduleId, ModuleLogFilter filter);
}
}
Expand Up @@ -40,16 +40,16 @@ public LogMessageParser(string iotHubName, string deviceId)
this.deviceId = Preconditions.CheckNonWhiteSpace(deviceId, nameof(deviceId));
}

public ModuleLogMessage Parse(ByteString byteString, string moduleId) =>
public ModuleLogMessageData Parse(ByteString byteString, string moduleId) =>
GetLogMessage(byteString, this.iotHubName, this.deviceId, moduleId);

internal static ModuleLogMessage GetLogMessage(ByteString arg, string iotHubName, string deviceId, string moduleId)
internal static ModuleLogMessageData GetLogMessage(ByteString arg, string iotHubName, string deviceId, string moduleId)
{
string stream = GetStream(arg[0]);
ByteString payload = arg.Slice(8);
string payloadString = payload.ToString(Encoding.UTF8);
(int logLevel, Option<DateTime> timeStamp, string logText) = ParseLogText(payloadString);
var moduleLogMessage = new ModuleLogMessage(iotHubName, deviceId, moduleId, stream, logLevel, timeStamp, logText);
var moduleLogMessage = new ModuleLogMessageData(iotHubName, deviceId, moduleId, stream, logLevel, timeStamp, logText, arg, payloadString);
return moduleLogMessage;
}

Expand Down
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
using akka::Akka.IO;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.IO;
using Microsoft.Azure.Devices.Edge.Util;

// Processes incoming logs stream and converts to the required format
Expand Down Expand Up @@ -41,33 +42,43 @@ public LogsProcessor(ILogMessageParser logMessageParser)
this.materializer = this.system.Materializer();
}

public async Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId)
public async Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter)
{
Preconditions.CheckNotNull(stream, nameof(stream));
Preconditions.CheckNotNull(filter, nameof(filter));
Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId));

var source = StreamConverters.FromInputStream(() => stream);
var seqSink = Sink.Seq<ModuleLogMessage>();
IRunnableGraph<Task<IImmutableList<ModuleLogMessage>>> graph = source
.Via(FramingFlow)
.Select(b => this.logMessageParser.Parse(b, moduleId))
.ToMaterialized(seqSink, Keep.Right);
GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, moduleId));
filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l));
filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text)));
IRunnableGraph<Task<IImmutableList<ModuleLogMessage>>> graph = graphBuilder.GetMaterializingGraph(m => (ModuleLogMessage)m);

IImmutableList<ModuleLogMessage> result = await graph.Run(this.materializer);
return result;
}

public async Task<IReadOnlyList<string>> GetText(Stream stream)
public async Task<IReadOnlyList<string>> GetText(Stream stream, string moduleId, ModuleLogFilter filter)
{
Preconditions.CheckNotNull(stream, nameof(stream));
var source = StreamConverters.FromInputStream(() => stream);
var seqSink = Sink.Seq<string>();
IRunnableGraph<Task<IImmutableList<string>>> graph = source
.Via(FramingFlow)
.Select(b => b.Slice(8))
.Select(b => b.ToString(Encoding.UTF8))
.ToMaterialized(seqSink, Keep.Right);
Preconditions.CheckNotNull(filter, nameof(filter));
Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId));

IRunnableGraph<Task<IImmutableList<string>>> GetGraph()
{
if (filter.Regex.HasValue || filter.LogLevel.HasValue)
{
GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, moduleId));
filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l));
filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text)));
return graphBuilder.GetMaterializingGraph(m => m.FullText);
}
else
{
return GraphBuilder.BuildMaterializedGraph(stream);
}
}

IRunnableGraph<Task<IImmutableList<string>>> graph = GetGraph();
IImmutableList<string> result = await graph.Run(this.materializer);
return result;
}
Expand All @@ -77,5 +88,49 @@ public void Dispose()
this.system?.Dispose();
this.materializer?.Dispose();
}

class GraphBuilder
{
Source<ModuleLogMessageData, Task<IOResult>> parsingGraphSource;

GraphBuilder(Source<ModuleLogMessageData, Task<IOResult>> parsingGraphSource)
{
this.parsingGraphSource = parsingGraphSource;
}

public static GraphBuilder CreateParsingGraphBuilder(Stream stream, Func<ByteString, ModuleLogMessageData> parserFunc)
{
var source = StreamConverters.FromInputStream(() => stream);
var graph = source
.Via(FramingFlow)
.Select(parserFunc);
return new GraphBuilder(graph);
}

public static IRunnableGraph<Task<IImmutableList<string>>> BuildMaterializedGraph(Stream stream)
{
var source = StreamConverters.FromInputStream(() => stream);
var seqSink = Sink.Seq<string>();
IRunnableGraph<Task<IImmutableList<string>>> graph = source
.Via(FramingFlow)
.Select(b => b.Slice(8))
.Select(b => b.ToString(Encoding.UTF8))
.ToMaterialized(seqSink, Keep.Right);
return graph;
}

public void AddFilter(Predicate<ModuleLogMessageData> predicate)
{
this.parsingGraphSource = this.parsingGraphSource.Where(predicate);
}

public IRunnableGraph<Task<IImmutableList<T>>> GetMaterializingGraph<T>(Func<ModuleLogMessageData, T> mapper)
{
var seqSink = Sink.Seq<T>();
return this.parsingGraphSource
.Select(mapper)
.ToMaterialized(seqSink, Keep.Right);
}
}
}
}
Expand Up @@ -24,7 +24,7 @@ public LogsProvider(IRuntimeInfoProvider runtimeInfoProvider, ILogsProcessor log
public async Task<byte[]> GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(logOptions, nameof(logOptions));
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, false, Option.None<int>(), Option.None<int>(), cancellationToken);
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, false, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken);
Events.ReceivedStream(logOptions.Id);

byte[] logBytes = await this.GetProcessedLogs(logsStream, logOptions);
Expand Down Expand Up @@ -94,11 +94,11 @@ async Task<byte[]> ProcessByContentType(Stream logsStream, ModuleLogOptions logO
switch (logOptions.ContentType)
{
case LogsContentType.Json:
IEnumerable<ModuleLogMessage> logMessages = await this.logsProcessor.GetMessages(logsStream, logOptions.Id);
IEnumerable<ModuleLogMessage> logMessages = await this.logsProcessor.GetMessages(logsStream, logOptions.Id, logOptions.Filter);
return logMessages.ToBytes();

default:
IEnumerable<string> logTexts = await this.logsProcessor.GetText(logsStream);
IEnumerable<string> logTexts = await this.logsProcessor.GetText(logsStream, logOptions.Id, logOptions.Filter);
string logTextString = logTexts.Join(string.Empty);
return logTextString.ToBytes();
}
Expand Down
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System.Text.RegularExpressions;
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

public class ModuleLogFilter
{
public ModuleLogFilter(Option<int> tail, Option<int> since, Option<int> logLevel, Option<string> regex)
{
this.Tail = tail;
this.Since = since;
this.LogLevel = logLevel;
this.Regex = regex.Map(r => new Regex(r));
}

[JsonConstructor]
ModuleLogFilter(int? tail, int? since, int? loglevel, string regex)
: this(Option.Maybe(tail), Option.Maybe(since), Option.Maybe(loglevel), Option.Maybe(regex))
{
}

public static ModuleLogFilter Empty = new ModuleLogFilter(Option.None<int>(), Option.None<int>(), Option.None<int>(), Option.None<string>());

public Option<int> Tail { get; }

public Option<int> Since { get; }

public Option<int> LogLevel { get; }

public Option<Regex> Regex { get; }
}
}
@@ -1,14 +1,22 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
extern alias akka;
using System;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Json;
using Newtonsoft.Json;

public class ModuleLogMessage
{
public ModuleLogMessage(string iotHub, string deviceId, string moduleId, string stream, int logLevel, Option<DateTime> timeStamp, string text)
public ModuleLogMessage(
string iotHub,
string deviceId,
string moduleId,
string stream,
int logLevel,
Option<DateTime> timeStamp,
string text)
{
this.IoTHub = Preconditions.CheckNonWhiteSpace(iotHub, nameof(iotHub));
this.DeviceId = Preconditions.CheckNonWhiteSpace(deviceId, nameof(deviceId));
Expand All @@ -21,7 +29,7 @@ public ModuleLogMessage(string iotHub, string deviceId, string moduleId, string

[JsonConstructor]
ModuleLogMessage(string iotHub, string deviceId, string moduleId, string stream, int logLevel, DateTime? timeStamp, string text)
: this(iotHub, deviceId, moduleId, stream, logLevel, timeStamp.HasValue ? Option.Some(timeStamp.Value) : Option.None<DateTime>(), text)
: this(iotHub, deviceId, moduleId, stream, logLevel, Option.Maybe(timeStamp), text)
{
}

Expand Down
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
extern alias akka;
using System;
using akka::Akka.IO;
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

public class ModuleLogMessageData : ModuleLogMessage
{
public ModuleLogMessageData(
string iotHub,
string deviceId,
string moduleId,
string stream,
int logLevel,
Option<DateTime> timeStamp,
string text,
ByteString fullFrame,
string fullText)
: base(iotHub, deviceId, moduleId, stream, logLevel, timeStamp, text)
{
this.FullText = Preconditions.CheckNonWhiteSpace(fullText, fullText);
this.FullFrame = Preconditions.CheckNotNull(fullFrame, nameof(fullFrame));
}

[JsonIgnore]
public ByteString FullFrame { get; }

[JsonIgnore]
public string FullText { get; }
}
}
Expand Up @@ -5,15 +5,17 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs

public class ModuleLogOptions
{
public ModuleLogOptions(string id, LogsContentEncoding contentEncoding, LogsContentType contentType)
public ModuleLogOptions(string id, LogsContentEncoding contentEncoding, LogsContentType contentType, ModuleLogFilter filter)
{
this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id));
this.ContentEncoding = contentEncoding;
this.ContentType = contentType;
this.Filter = Preconditions.CheckNotNull(filter, nameof(filter));
}

public string Id { get; }
public LogsContentEncoding ContentEncoding { get; }
public LogsContentType ContentType { get; }
public ModuleLogFilter Filter { get; }
}
}
Expand Up @@ -23,7 +23,7 @@ public LogsUploadRequestHandler(ILogsUploader logsUploader, ILogsProvider logsPr
protected override async Task<Option<object>> HandleRequestInternal(Option<LogsUploadRequest> payloadOption)
{
LogsUploadRequest payload = payloadOption.Expect(() => new ArgumentException("Request payload not found"));
var moduleLogOptions = new ModuleLogOptions(payload.Id, payload.Encoding, payload.ContentType);
var moduleLogOptions = new ModuleLogOptions(payload.Id, payload.Encoding, payload.ContentType, ModuleLogFilter.Empty);
byte[] logBytes = await this.logsProvider.GetLogs(moduleLogOptions, CancellationToken.None);
await this.logsUploader.Upload(payload.SasUrl, payload.Id, logBytes, payload.Encoding, payload.ContentType);
return Option.None<object>();
Expand Down
Expand Up @@ -28,7 +28,7 @@ public async Task Handle(IClientWebSocket clientWebSocket, CancellationToken can
LogsStreamRequest streamRequest = await this.ReadLogsStreamingRequest(clientWebSocket, cancellationToken);
Events.RequestData(streamRequest);

var logOptions = new ModuleLogOptions(streamRequest.Id, LogsContentEncoding.None, LogsContentType.Text);
var logOptions = new ModuleLogOptions(streamRequest.Id, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty);
using (var socketCancellationTokenSource = new CancellationTokenSource())
{
Task ProcessLogsFrame(ArraySegment<byte> bytes)
Expand Down
@@ -0,0 +1,47 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Test.Logs
{
using System;
using System.Collections.Generic;
using System.Text;

public class DockerFraming
{
public static byte[] Frame(IEnumerable<string> logTexts)
{
var outputBytes = new List<byte>();
foreach (string text in logTexts)
{
outputBytes.AddRange(Frame(text));
}

return outputBytes.ToArray();
}

public static byte[] Frame(string text)
{
byte streamByte = 01;
var padding = new byte[3];
var outputBytes = new List<byte>();
byte[] textBytes = Encoding.UTF8.GetBytes(text);
byte[] lenBytes = GetLengthBytes(textBytes.Length);
outputBytes.Add(streamByte);
outputBytes.AddRange(padding);
outputBytes.AddRange(lenBytes);
outputBytes.AddRange(textBytes);
return outputBytes.ToArray();
}

static byte[] GetLengthBytes(int len)
{
byte[] intBytes = BitConverter.GetBytes(len);
if (BitConverter.IsLittleEndian)
{
Array.Reverse(intBytes);
}

byte[] result = intBytes;
return result;
}
}
}

0 comments on commit a8cdf8d

Please sign in to comment.