From 66ed44e0998e4b24e5db28f0386b7787661d746e Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Tue, 18 Mar 2025 23:45:47 +0300 Subject: [PATCH 1/4] Implement --format-in csv --- .../scala/ru/d10xa/jsonlogviewer/App.scala | 18 +- .../d10xa/jsonlogviewer/LogViewerStream.scala | 156 ++++++++++++++---- .../jsonlogviewer/csv/CsvLogLineParser.scala | 74 +++++++++ .../d10xa/jsonlogviewer/csv/CsvParser.scala | 33 ++++ .../d10xa/jsonlogviewer/decline/Config.scala | 4 +- .../jsonlogviewer/decline/DeclineOpts.scala | 2 +- .../decline/FormatInValidator.scala | 1 + .../csv/CsvLogLineParserTest.scala | 99 +++++++++++ 8 files changed, 354 insertions(+), 33 deletions(-) create mode 100644 json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala create mode 100644 json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvParser.scala create mode 100644 json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala diff --git a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/App.scala b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/App.scala index a46886d..76289ca 100644 --- a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/App.scala +++ b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/App.scala @@ -17,6 +17,7 @@ import ru.d10xa.jsonlogviewer.Router0.ViewPage import ru.d10xa.jsonlogviewer.Router0.navigateTo import ru.d10xa.jsonlogviewer.decline.Config import ru.d10xa.jsonlogviewer.decline.Config.FormatIn +import ru.d10xa.jsonlogviewer.decline.Config.FormatIn.Csv import ru.d10xa.jsonlogviewer.decline.Config.FormatIn.Json import ru.d10xa.jsonlogviewer.decline.Config.FormatIn.Logfmt import ru.d10xa.jsonlogviewer.decline.Config.FormatOut @@ -44,6 +45,16 @@ object App { |@timestamp=2023-09-18T19:10:10.123456Z second line {"level":"INFO"} |""".stripMargin + val csvSample: String = + """@timestamp,level,logger_name,thread_name,message + |2023-09-18T19:10:10.123456Z,INFO,MakeLogs,main,"first line, with comma" + |2023-09-18T19:11:20.132318Z,INFO,MakeLogs,main,test + |2023-09-18T19:12:30.132319Z,DEBUG,MakeLogs,main,debug msg + |2023-09-18T19:13:42.132321Z,WARN,MakeLogs,main,warn msg + |2023-09-18T19:14:42.137207Z,ERROR,MakeLogs,main,"error message,error details" + |2023-09-18T19:15:42.137207Z,INFO,MakeLogs,main,last line + |""".stripMargin + val textVar: Var[String] = Var("") val cliVar: Var[String] = Var( @@ -132,13 +143,16 @@ object App { value <-- formatInVar.signal.map { case FormatIn.Json => "json" case FormatIn.Logfmt => "logfmt" + case FormatIn.Csv => "csv" }, onChange.mapToValue.map { case "json" => FormatIn.Json case "logfmt" => FormatIn.Logfmt + case "csv" => FormatIn.Csv } --> formatInVar, option(value := "json", "json"), - option(value := "logfmt", "logfmt") + option(value := "logfmt", "logfmt"), + option(value := "csv", "csv") ) ) def formatOutDiv: ReactiveHtmlElement[HTMLDivElement] = div( @@ -196,11 +210,13 @@ object App { child.text <-- formatInVar.signal.map { case Logfmt => "Generate logfmt logs" case Json => "Generate json logs" + case Csv => "Generate csv logs" }, onClick --> { _ => formatInVar.now() match case Config.FormatIn.Json => textVar.set(jsonLogSample) case Config.FormatIn.Logfmt => textVar.set(logfmtSample) + case Config.FormatIn.Csv => textVar.set(csvSample) } ) private def renderLivePage(): HtmlElement = { diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala index 18f35a7..c6a9494 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala @@ -3,6 +3,8 @@ package ru.d10xa.jsonlogviewer import cats.effect.IO import cats.effect.Ref import fs2.* +import fs2.Pull +import ru.d10xa.jsonlogviewer.csv.CsvLogLineParser import ru.d10xa.jsonlogviewer.decline.yaml.ConfigYaml import ru.d10xa.jsonlogviewer.decline.yaml.Feed import ru.d10xa.jsonlogviewer.decline.Config @@ -35,16 +37,33 @@ object LogViewerStream { val feedStreams = feeds.zipWithIndex.map { (feed, index) => val feedStream: Stream[IO, String] = commandsAndInlineInputToStream(feed.commands, feed.inlineInput) - processStream( + + // First line of csv is header + val formatIn = feed.formatIn.orElse(config.formatIn) + if (formatIn.contains(FormatIn.Csv)) { + processStreamWithCsvHeader( + config, + feedStream, + configYamlRef, + index + ) + } else { + processStream(config, feedStream, configYamlRef, index) + } + } + Stream.emits(feedStreams).parJoin(feedStreams.size) + case None => + // First line of csv is header + if (config.formatIn.contains(FormatIn.Csv)) { + processStreamWithCsvHeader( config, - feedStream, + stdinLinesStream, configYamlRef, - index + -1 ) + } else { + processStream(config, stdinLinesStream, configYamlRef, -1) } - Stream.emits(feedStreams).parJoin(feedStreams.size) - case None => - processStream(config, stdinLinesStream, configYamlRef, -1) } finalStream @@ -52,25 +71,85 @@ object LogViewerStream { .append(Stream.emit("\n")) } - private def commandsAndInlineInputToStream( - commands: List[String], - inlineInput: Option[String] + private def processStreamWithCsvHeader( + config: Config, + lines: Stream[IO, String], + configYamlRef: Ref[IO, Option[ConfigYaml]], + index: Int ): Stream[IO, String] = - new ShellImpl().mergeCommandsAndInlineInput(commands, inlineInput) + lines.pull.uncons1.flatMap { + case Some((headerLine, rest)) => + val csvHeaderParser = CsvLogLineParser(config, headerLine) + Stream + .emit(csvHeaderParser) + .flatMap { parser => + processStreamWithParser(config, rest, configYamlRef, index, parser) + } + .pull + .echo + case None => + Pull.done + }.stream - def makeLogLineParser( + private def processStreamWithParser( config: Config, - optFormatIn: Option[FormatIn] - ): LogLineParser = { - val jsonPrefixPostfix = JsonPrefixPostfix(JsonDetector()) - optFormatIn match { - case Some(FormatIn.Logfmt) => LogfmtLogLineParser(config) - case _ => JsonLogLineParser(config, jsonPrefixPostfix) - } - } + lines: Stream[IO, String], + configYamlRef: Ref[IO, Option[ConfigYaml]], + index: Int, + logLineParser: LogLineParser + ): Stream[IO, String] = + for { + line <- lines + optConfigYaml <- Stream.eval(configYamlRef.get) + filter = optConfigYaml + .flatMap(_.feeds) + .flatMap(_.lift(index).flatMap(_.filter)) + .orElse(config.filter) + rawInclude = optConfigYaml + .flatMap(_.feeds) + .flatMap(_.lift(index).flatMap(_.rawInclude)) + rawExclude = optConfigYaml + .flatMap(_.feeds) + .flatMap(_.lift(index).flatMap(_.rawExclude)) + feedName = optConfigYaml + .flatMap(_.feeds) + .flatMap(_.lift(index).flatMap(_.name)) + effectiveConfig = config.copy( + filter = filter + ) + timestampFilter = TimestampFilter() + parseResultKeys = ParseResultKeys(effectiveConfig) + logLineFilter = LogLineFilter(effectiveConfig, parseResultKeys) + outputLineFormatter = effectiveConfig.formatOut match + case Some(Config.FormatOut.Raw) => RawFormatter() + case Some(Config.FormatOut.Pretty) | None => + ColorLineFormatter(effectiveConfig, feedName) + evaluatedLine <- Stream + .emit(line) + .filter(rawFilter(_, rawInclude, rawExclude)) + .map(logLineParser.parse) + .filter(logLineFilter.grep) + .filter(logLineFilter.logLineQueryPredicate) + .through( + timestampFilter.filterTimestampAfter(effectiveConfig.timestamp.after) + ) + .through( + timestampFilter.filterTimestampBefore( + effectiveConfig.timestamp.before + ) + ) + .map(pr => + Try(outputLineFormatter.formatLine(pr)) match { + case Success(formatted) => formatted.toString + case Failure(_) => pr.raw + } + ) + } yield evaluatedLine + /** Processing for non-csv files + */ private def processStream( - baseConfig: Config, + config: Config, lines: Stream[IO, String], configYamlRef: Ref[IO, Option[ConfigYaml]], index: Int @@ -81,11 +160,11 @@ object LogViewerStream { formatIn = optConfigYaml .flatMap(_.feeds) .flatMap(_.lift(index).flatMap(_.formatIn)) - .orElse(baseConfig.formatIn) + .orElse(config.formatIn) filter = optConfigYaml .flatMap(_.feeds) .flatMap(_.lift(index).flatMap(_.filter)) - .orElse(baseConfig.filter) + .orElse(config.filter) rawInclude = optConfigYaml .flatMap(_.feeds) .flatMap(_.lift(index).flatMap(_.rawInclude)) @@ -95,14 +174,14 @@ object LogViewerStream { feedName = optConfigYaml .flatMap(_.feeds) .flatMap(_.lift(index).flatMap(_.name)) - effectiveConfig = baseConfig.copy( + effectiveConfig = config.copy( filter = filter, formatIn = formatIn ) timestampFilter = TimestampFilter() parseResultKeys = ParseResultKeys(effectiveConfig) logLineFilter = LogLineFilter(effectiveConfig, parseResultKeys) - logLineParser = makeLogLineParser(effectiveConfig, formatIn) + logLineParser = makeNonCsvLogLineParser(effectiveConfig, formatIn) outputLineFormatter = effectiveConfig.formatOut match case Some(Config.FormatOut.Raw) => RawFormatter() case Some(Config.FormatOut.Pretty) | None => @@ -127,21 +206,42 @@ object LogViewerStream { case Failure(_) => pr.raw } ) - .map(_.toString) } yield evaluatedLine + private def commandsAndInlineInputToStream( + commands: List[String], + inlineInput: Option[String] + ): Stream[IO, String] = + new ShellImpl().mergeCommandsAndInlineInput(commands, inlineInput) + + def makeNonCsvLogLineParser( + config: Config, + optFormatIn: Option[FormatIn] + ): LogLineParser = { + val jsonPrefixPostfix = JsonPrefixPostfix(JsonDetector()) + optFormatIn match { + case Some(FormatIn.Logfmt) => LogfmtLogLineParser(config) + case Some(FormatIn.Csv) => + throw new IllegalStateException( + "method makeNonCsvLogLineParserCSV not support csv" + ) + case _ => JsonLogLineParser(config, jsonPrefixPostfix) + } + } + def rawFilter( str: String, include: Option[List[String]], exclude: Option[List[String]] ): Boolean = { - val includeRegexes: List[Regex] = include.getOrElse(Nil).map(_.r) - val excludeRegexes: List[Regex] = exclude.getOrElse(Nil).map(_.r) + val includeRegexes: List[Regex] = + include.getOrElse(Nil).map(_.r) + val excludeRegexes: List[Regex] = + exclude.getOrElse(Nil).map(_.r) val includeMatches = includeRegexes.isEmpty || includeRegexes.exists( _.findFirstIn(str).isDefined ) val excludeMatches = excludeRegexes.forall(_.findFirstIn(str).isEmpty) includeMatches && excludeMatches } - } diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala new file mode 100644 index 0000000..2a1a60a --- /dev/null +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala @@ -0,0 +1,74 @@ +package ru.d10xa.jsonlogviewer.csv + +import ru.d10xa.jsonlogviewer.LogLineParser +import ru.d10xa.jsonlogviewer.ParseResult +import ru.d10xa.jsonlogviewer.ParsedLine +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.HardcodedFieldNames.* + +class CsvLogLineParser(config: Config, headers: List[String]) extends LogLineParser { + private val csvParser = new CsvParser() + private val timestampFieldName: String = config.timestamp.fieldName + + private val knownFieldNames: Seq[String] = Seq( + timestampFieldName, + levelFieldName, + messageFieldName, + stackTraceFieldName, + loggerNameFieldName, + threadNameFieldName + ) + + private val headerIndices: Map[String, Int] = headers.zipWithIndex.toMap + + override def parse(s: String): ParseResult = { + val values = csvParser.parseLine(s) + + val fieldsMap = headerIndices.flatMap { case (header, index) => + if (index < values.size) Some(header -> values(index)) + else None + } + + val timestamp = fieldsMap.get(timestampFieldName) + val level = fieldsMap.get(levelFieldName) + val message = fieldsMap.get(messageFieldName) + val stackTrace = fieldsMap.get(stackTraceFieldName) + val loggerName = fieldsMap.get(loggerNameFieldName) + val threadName = fieldsMap.get(threadNameFieldName) + + val otherAttributes = fieldsMap.view.filterKeys(!knownFieldNames.contains(_)).toMap + + ParseResult( + raw = s, + parsed = Some( + ParsedLine( + timestamp = timestamp, + level = level, + message = message, + stackTrace = stackTrace, + loggerName = loggerName, + threadName = threadName, + otherAttributes = otherAttributes + ) + ), + middle = "", + prefix = None, + postfix = None + ) + } +} + +object CsvLogLineParser { + /** + * Создает CSV парсер логов на основе строки заголовков. + * + * @param config конфигурация + * @param headerLine строка заголовков CSV + * @return парсер логов CSV + */ + def apply(config: Config, headerLine: String): CsvLogLineParser = { + val csvParser = new CsvParser() + val headers = csvParser.parseLine(headerLine) + new CsvLogLineParser(config, headers) + } +} diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvParser.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvParser.scala new file mode 100644 index 0000000..1d31c04 --- /dev/null +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvParser.scala @@ -0,0 +1,33 @@ +package ru.d10xa.jsonlogviewer.csv + +class CsvParser { + + def parseLine(line: String, delimiter: Char = ','): List[String] = { + val result = scala.collection.mutable.ListBuffer[String]() + val currentField = new StringBuilder + var inQuotes = false + var i = 0 + + while (i < line.length) { + val c = line(i) + if (c == '"') { + if (inQuotes && i + 1 < line.length && line(i + 1) == '"') { + currentField.append('"') + i += 1 + } else { + inQuotes = !inQuotes + } + } else if (c == delimiter && !inQuotes) { + result += currentField.toString + currentField.clear() + } else { + currentField.append(c) + } + i += 1 + } + + result += currentField.toString + + result.toList + } +} diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala index 88315c8..9347c3c 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/Config.scala @@ -18,9 +18,7 @@ object Config: final case class ConfigGrep(key: String, value: Regex) enum FormatIn: - case Json, Logfmt + case Json, Logfmt, Csv enum FormatOut: case Pretty, Raw - -end Config diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala index acdac00..6c31d61 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/DeclineOpts.scala @@ -43,7 +43,7 @@ object DeclineOpts { .orNone val formatIn: Opts[Option[FormatIn]] = Opts - .option[String]("format-in", help = "json, logfmt") + .option[String]("format-in", help = "json, logfmt, csv") .mapValidated(FormatInValidator.toValidatedFormatIn) .orNone diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FormatInValidator.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FormatInValidator.scala index de33361..588399f 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FormatInValidator.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/FormatInValidator.scala @@ -10,5 +10,6 @@ object FormatInValidator { ): Validated[NonEmptyList[String], FormatIn] = str match case "json" => Validated.valid(FormatIn.Json) case "logfmt" => Validated.valid(FormatIn.Logfmt) + case "csv" => Validated.valid(FormatIn.Csv) case other => Validated.invalidNel(s"Wrong format: $other") } diff --git a/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala new file mode 100644 index 0000000..5a6a43e --- /dev/null +++ b/json-log-viewer/shared/src/test/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParserTest.scala @@ -0,0 +1,99 @@ +package ru.d10xa.jsonlogviewer.csv + +import munit.FunSuite +import ru.d10xa.jsonlogviewer.decline.Config +import ru.d10xa.jsonlogviewer.decline.TimestampConfig + +class CsvLogLineParserTest extends FunSuite { + val config: Config = Config( + configFile = None, + timestamp = TimestampConfig( + fieldName = "@timestamp", + after = None, + before = None + ), + grep = List.empty, + filter = None, + formatIn = Some(Config.FormatIn.Csv), + formatOut = None + ) + + test("parse CSV log line with standard headers") { + val headerLine = "@timestamp,level,message,logger_name,thread_name,stack_trace" + val logLine = "2023-01-01T12:00:00Z,INFO,Test message,Logger,main,null" + + val parser = CsvLogLineParser(config, headerLine) + val result = parser.parse(logLine) + + assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals(result.parsed.flatMap(_.level), Some("INFO")) + assertEquals(result.parsed.flatMap(_.message), Some("Test message")) + assertEquals(result.parsed.flatMap(_.loggerName), Some("Logger")) + assertEquals(result.parsed.flatMap(_.threadName), Some("main")) + assertEquals(result.parsed.flatMap(_.stackTrace), Some("null")) + assert(result.parsed.exists(_.otherAttributes.isEmpty)) + } + + test("parse CSV log line with custom headers") { + val headerLine = "@timestamp,level,message,custom_field" + val logLine = "2023-01-01T12:00:00Z,INFO,Test message,custom value" + + val parser = CsvLogLineParser(config, headerLine) + val result = parser.parse(logLine) + + assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals(result.parsed.flatMap(_.level), Some("INFO")) + assertEquals(result.parsed.flatMap(_.message), Some("Test message")) + assertEquals(result.parsed.exists(_.otherAttributes.contains("custom_field")), true) + assertEquals(result.parsed.flatMap(_.otherAttributes.get("custom_field")), Some("custom value")) + } + + test("parse CSV log line with spaces in header") { + val headerLine = "\"@timestamp\",\"log level\",\"message text\",\"logger name\"" + val logLine = "2023-01-01T12:00:00Z,INFO,Test message,Logger" + + val parser = CsvLogLineParser(config, headerLine) + val result = parser.parse(logLine) + + assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals(result.parsed.flatMap(_.level), None) + assertEquals(result.parsed.flatMap(_.message), None) + assertEquals(result.parsed.flatMap(_.loggerName), None) + + assertEquals(result.parsed.exists(_.otherAttributes.contains("log level")), true) + assertEquals(result.parsed.flatMap(_.otherAttributes.get("log level")), Some("INFO")) + assertEquals(result.parsed.exists(_.otherAttributes.contains("message text")), true) + assertEquals(result.parsed.flatMap(_.otherAttributes.get("message text")), Some("Test message")) + assertEquals(result.parsed.exists(_.otherAttributes.contains("logger name")), true) + assertEquals(result.parsed.flatMap(_.otherAttributes.get("logger name")), Some("Logger")) + } + + test("parse CSV log line with missing fields") { + val headerLine = "@timestamp,level,message,logger_name,thread_name,stack_trace,custom_field" + val logLine = "2023-01-01T12:00:00Z,INFO,Test message,Logger" + + val parser = CsvLogLineParser(config, headerLine) + val result = parser.parse(logLine) + + assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals(result.parsed.flatMap(_.level), Some("INFO")) + assertEquals(result.parsed.flatMap(_.message), Some("Test message")) + assertEquals(result.parsed.flatMap(_.loggerName), Some("Logger")) + assertEquals(result.parsed.flatMap(_.threadName), None) + assertEquals(result.parsed.flatMap(_.stackTrace), None) + assertEquals(result.parsed.exists(_.otherAttributes.contains("custom_field")), false) + } + + test("parse CSV log line with more values than headers") { + val headerLine = "@timestamp,level,message" + val logLine = "2023-01-01T12:00:00Z,INFO,Test message,Extra,Values" + + val parser = CsvLogLineParser(config, headerLine) + val result = parser.parse(logLine) + + assertEquals(result.parsed.flatMap(_.timestamp), Some("2023-01-01T12:00:00Z")) + assertEquals(result.parsed.flatMap(_.level), Some("INFO")) + assertEquals(result.parsed.flatMap(_.message), Some("Test message")) + assert(result.parsed.exists(_.otherAttributes.isEmpty)) + } +} \ No newline at end of file From 25065b9d4e036bdff80425cd05b4ff6881398f22 Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Tue, 18 Mar 2025 23:51:38 +0300 Subject: [PATCH 2/4] Refactoring LogViewerStream --- .../d10xa/jsonlogviewer/LogViewerStream.scala | 217 ++++++++---------- 1 file changed, 93 insertions(+), 124 deletions(-) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala index c6a9494..ab29ef1 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala @@ -38,32 +38,23 @@ object LogViewerStream { val feedStream: Stream[IO, String] = commandsAndInlineInputToStream(feed.commands, feed.inlineInput) - // First line of csv is header - val formatIn = feed.formatIn.orElse(config.formatIn) - if (formatIn.contains(FormatIn.Csv)) { - processStreamWithCsvHeader( - config, - feedStream, - configYamlRef, - index - ) - } else { - processStream(config, feedStream, configYamlRef, index) - } + createProcessStream( + config = config, + lines = feedStream, + configYamlRef = configYamlRef, + index = index, + initialFormatIn = feed.formatIn.orElse(config.formatIn) + ) } Stream.emits(feedStreams).parJoin(feedStreams.size) case None => - // First line of csv is header - if (config.formatIn.contains(FormatIn.Csv)) { - processStreamWithCsvHeader( - config, - stdinLinesStream, - configYamlRef, - -1 - ) - } else { - processStream(config, stdinLinesStream, configYamlRef, -1) - } + createProcessStream( + config = config, + lines = stdinLinesStream, + configYamlRef = configYamlRef, + index = -1, + initialFormatIn = config.formatIn + ) } finalStream @@ -71,124 +62,73 @@ object LogViewerStream { .append(Stream.emit("\n")) } - private def processStreamWithCsvHeader( + private def createProcessStream( config: Config, lines: Stream[IO, String], configYamlRef: Ref[IO, Option[ConfigYaml]], - index: Int + index: Int, + initialFormatIn: Option[FormatIn] ): Stream[IO, String] = - lines.pull.uncons1.flatMap { - case Some((headerLine, rest)) => - val csvHeaderParser = CsvLogLineParser(config, headerLine) - Stream - .emit(csvHeaderParser) - .flatMap { parser => - processStreamWithParser(config, rest, configYamlRef, index, parser) - } - .pull - .echo - case None => - Pull.done - }.stream + // Обрабатываем CSV отдельно, так как для него нужна строка заголовка + if (initialFormatIn.contains(FormatIn.Csv)) { + lines.pull.uncons1.flatMap { + case Some((headerLine, rest)) => + val csvHeaderParser = CsvLogLineParser(config, headerLine) + processStreamWithEffectiveConfig( + config = config, + lines = rest, + configYamlRef = configYamlRef, + index = index, + parser = Some(csvHeaderParser) + ).pull.echo + case None => + Pull.done + }.stream + } else { + processStreamWithEffectiveConfig( + config = config, + lines = lines, + configYamlRef = configYamlRef, + index = index, + parser = None + ) + } - private def processStreamWithParser( + private def processStreamWithEffectiveConfig( config: Config, lines: Stream[IO, String], configYamlRef: Ref[IO, Option[ConfigYaml]], index: Int, - logLineParser: LogLineParser + parser: Option[LogLineParser] ): Stream[IO, String] = for { line <- lines optConfigYaml <- Stream.eval(configYamlRef.get) - filter = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.filter)) - .orElse(config.filter) - rawInclude = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.rawInclude)) - rawExclude = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.rawExclude)) - feedName = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.name)) + + feedConfig = extractFeedConfig(optConfigYaml, index) + effectiveConfig = config.copy( - filter = filter + filter = feedConfig.filter.orElse(config.filter), + formatIn = feedConfig.formatIn.orElse(config.formatIn) ) + timestampFilter = TimestampFilter() parseResultKeys = ParseResultKeys(effectiveConfig) logLineFilter = LogLineFilter(effectiveConfig, parseResultKeys) - outputLineFormatter = effectiveConfig.formatOut match - case Some(Config.FormatOut.Raw) => RawFormatter() - case Some(Config.FormatOut.Pretty) | None => - ColorLineFormatter(effectiveConfig, feedName) - evaluatedLine <- Stream - .emit(line) - .filter(rawFilter(_, rawInclude, rawExclude)) - .map(logLineParser.parse) - .filter(logLineFilter.grep) - .filter(logLineFilter.logLineQueryPredicate) - .through( - timestampFilter.filterTimestampAfter(effectiveConfig.timestamp.after) - ) - .through( - timestampFilter.filterTimestampBefore( - effectiveConfig.timestamp.before - ) - ) - .map(pr => - Try(outputLineFormatter.formatLine(pr)) match { - case Success(formatted) => formatted.toString - case Failure(_) => pr.raw - } - ) - } yield evaluatedLine - /** Processing for non-csv files - */ - private def processStream( - config: Config, - lines: Stream[IO, String], - configYamlRef: Ref[IO, Option[ConfigYaml]], - index: Int - ): Stream[IO, String] = - for { - line <- lines - optConfigYaml <- Stream.eval(configYamlRef.get) - formatIn = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.formatIn)) - .orElse(config.formatIn) - filter = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.filter)) - .orElse(config.filter) - rawInclude = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.rawInclude)) - rawExclude = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.rawExclude)) - feedName = optConfigYaml - .flatMap(_.feeds) - .flatMap(_.lift(index).flatMap(_.name)) - effectiveConfig = config.copy( - filter = filter, - formatIn = formatIn + logLineParser = parser.getOrElse( + makeNonCsvLogLineParser(effectiveConfig, feedConfig.formatIn) ) - timestampFilter = TimestampFilter() - parseResultKeys = ParseResultKeys(effectiveConfig) - logLineFilter = LogLineFilter(effectiveConfig, parseResultKeys) - logLineParser = makeNonCsvLogLineParser(effectiveConfig, formatIn) - outputLineFormatter = effectiveConfig.formatOut match + + outputLineFormatter = effectiveConfig.formatOut match { case Some(Config.FormatOut.Raw) => RawFormatter() case Some(Config.FormatOut.Pretty) | None => - ColorLineFormatter(effectiveConfig, feedName) + ColorLineFormatter(effectiveConfig, feedConfig.feedName) + } + evaluatedLine <- Stream .emit(line) - .filter(rawFilter(_, rawInclude, rawExclude)) + .filter(rawFilter(_, feedConfig.rawInclude, feedConfig.rawExclude)) .map(logLineParser.parse) .filter(logLineFilter.grep) .filter(logLineFilter.logLineQueryPredicate) @@ -200,14 +140,43 @@ object LogViewerStream { effectiveConfig.timestamp.before ) ) - .map(pr => - Try(outputLineFormatter.formatLine(pr)) match { - case Success(formatted) => formatted.toString - case Failure(_) => pr.raw - } - ) + .map(formatWithSafety(_, outputLineFormatter)) } yield evaluatedLine + private def formatWithSafety( + parseResult: ParseResult, + formatter: OutputLineFormatter + ): String = + Try(formatter.formatLine(parseResult)) match { + case Success(formatted) => formatted.toString + case Failure(_) => parseResult.raw + } + + private case class FeedConfig( + feedName: Option[String], + filter: Option[ru.d10xa.jsonlogviewer.query.QueryAST], + formatIn: Option[FormatIn], + rawInclude: Option[List[String]], + rawExclude: Option[List[String]] + ) + + private def extractFeedConfig( + optConfigYaml: Option[ConfigYaml], + index: Int + ): FeedConfig = { + val feedOpt = optConfigYaml + .flatMap(_.feeds) + .flatMap(_.lift(index)) + + FeedConfig( + feedName = feedOpt.flatMap(_.name), + filter = feedOpt.flatMap(_.filter), + formatIn = feedOpt.flatMap(_.formatIn), + rawInclude = feedOpt.flatMap(_.rawInclude), + rawExclude = feedOpt.flatMap(_.rawExclude) + ) + } + private def commandsAndInlineInputToStream( commands: List[String], inlineInput: Option[String] @@ -223,7 +192,7 @@ object LogViewerStream { case Some(FormatIn.Logfmt) => LogfmtLogLineParser(config) case Some(FormatIn.Csv) => throw new IllegalStateException( - "method makeNonCsvLogLineParserCSV not support csv" + "method makeNonCsvLogLineParser does not support csv" ) case _ => JsonLogLineParser(config, jsonPrefixPostfix) } From b7e1f6ac9f96e3a8fa3360a8f3bd27063c2d54f0 Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Wed, 19 Mar 2025 00:15:22 +0300 Subject: [PATCH 3/4] Add excludeFields to yaml --- .../decline/yaml/ConfigYamlLoaderImpl.scala | 9 ++++- .../decline/yaml/ConfigYamlLoaderTest.scala | 37 ++++++++++++++++++ .../d10xa/jsonlogviewer/LogViewerStream.scala | 16 ++++---- .../jsonlogviewer/csv/CsvLogLineParser.scala | 7 ---- .../jsonlogviewer/decline/yaml/Feed.scala | 3 +- .../formatout/ColorLineFormatter.scala | 39 +++++++++++++------ 6 files changed, 83 insertions(+), 28 deletions(-) diff --git a/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala b/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala index d3b53c5..2d227e2 100644 --- a/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala +++ b/json-log-viewer/jvm/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderImpl.scala @@ -5,7 +5,6 @@ import cats.data.Validated import cats.data.ValidatedNel import cats.syntax.all.* import io.circe.* -import io.circe.generic.auto.* import io.circe.yaml.scalayaml.parser import ru.d10xa.jsonlogviewer.decline.Config.FormatIn import ru.d10xa.jsonlogviewer.decline.FormatInValidator @@ -148,6 +147,11 @@ class ConfigYamlLoaderImpl extends ConfigYamlLoader { parseOptionalListString(feedFields, "rawInclude") val rawExcludeValidated = parseOptionalListString(feedFields, "rawExclude") + val excludeFieldsValidated = + parseOptionalListString( + feedFields, + "excludeFields" + ) ( nameValidated, commandsValidated, @@ -155,7 +159,8 @@ class ConfigYamlLoaderImpl extends ConfigYamlLoader { filterValidated, formatInValidated, rawIncludeValidated, - rawExcludeValidated + rawExcludeValidated, + excludeFieldsValidated ) .mapN(Feed.apply) } diff --git a/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala b/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala index dd0a087..18e5357 100644 --- a/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala +++ b/json-log-viewer/jvm/src/test/scala/ru/d10xa/jsonlogviewer/decline/yaml/ConfigYamlLoaderTest.scala @@ -70,4 +70,41 @@ class ConfigYamlLoaderTest extends FunSuite { val errors = result.swap.toOption.get assert(errors.exists(_.contains("Invalid 'feeds' field format, should be a list"))) } + + test("parse valid yaml with excludeFields") { + val yaml = + """|feeds: + | - name: "pod-logs" + | commands: + | - "./mock-logs.sh pod1" + | excludeFields: + | - "level" + | - "logger_name" + | - "thread_name" + | - name: "service-logs" + | commands: + | - "./mock-logs.sh service1" + | excludeFields: + | - "@timestamp" + |""".stripMargin + + val result = configYamlLoader.parseYamlFile(yaml) + assert(result.isValid, s"Result should be valid: $result") + + val config = result.toOption.get + + val feeds = config.feeds.get + assertEquals(feeds.size, 2) + + val feed1 = feeds.head + assertEquals(feed1.name, Some("pod-logs")) + assertEquals( + feed1.excludeFields, + Some(List("level", "logger_name", "thread_name")) + ) + + val feed2 = feeds(1) + assertEquals(feed2.name, Some("service-logs")) + assertEquals(feed2.excludeFields, Some(List("@timestamp"))) + } } diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala index ab29ef1..3d3b057 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/LogViewerStream.scala @@ -69,7 +69,6 @@ object LogViewerStream { index: Int, initialFormatIn: Option[FormatIn] ): Stream[IO, String] = - // Обрабатываем CSV отдельно, так как для него нужна строка заголовка if (initialFormatIn.contains(FormatIn.Csv)) { lines.pull.uncons1.flatMap { case Some((headerLine, rest)) => @@ -123,7 +122,7 @@ object LogViewerStream { outputLineFormatter = effectiveConfig.formatOut match { case Some(Config.FormatOut.Raw) => RawFormatter() case Some(Config.FormatOut.Pretty) | None => - ColorLineFormatter(effectiveConfig, feedConfig.feedName) + ColorLineFormatter(effectiveConfig, feedConfig.feedName, feedConfig.excludeFields) } evaluatedLine <- Stream @@ -152,18 +151,20 @@ object LogViewerStream { case Failure(_) => parseResult.raw } + // TODO private case class FeedConfig( feedName: Option[String], filter: Option[ru.d10xa.jsonlogviewer.query.QueryAST], formatIn: Option[FormatIn], rawInclude: Option[List[String]], - rawExclude: Option[List[String]] + rawExclude: Option[List[String]], + excludeFields: Option[List[String]] ) private def extractFeedConfig( - optConfigYaml: Option[ConfigYaml], - index: Int - ): FeedConfig = { + optConfigYaml: Option[ConfigYaml], + index: Int + ): FeedConfig = { val feedOpt = optConfigYaml .flatMap(_.feeds) .flatMap(_.lift(index)) @@ -173,7 +174,8 @@ object LogViewerStream { filter = feedOpt.flatMap(_.filter), formatIn = feedOpt.flatMap(_.formatIn), rawInclude = feedOpt.flatMap(_.rawInclude), - rawExclude = feedOpt.flatMap(_.rawExclude) + rawExclude = feedOpt.flatMap(_.rawExclude), + excludeFields = feedOpt.flatMap(_.excludeFields) ) } diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala index 2a1a60a..3df3d12 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/csv/CsvLogLineParser.scala @@ -59,13 +59,6 @@ class CsvLogLineParser(config: Config, headers: List[String]) extends LogLinePar } object CsvLogLineParser { - /** - * Создает CSV парсер логов на основе строки заголовков. - * - * @param config конфигурация - * @param headerLine строка заголовков CSV - * @return парсер логов CSV - */ def apply(config: Config, headerLine: String): CsvLogLineParser = { val csvParser = new CsvParser() val headers = csvParser.parseLine(headerLine) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala index 6a0d7f7..f60615d 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/decline/yaml/Feed.scala @@ -10,5 +10,6 @@ case class Feed( filter: Option[QueryAST], formatIn: Option[FormatIn], rawInclude: Option[List[String]], - rawExclude: Option[List[String]] + rawExclude: Option[List[String]], + excludeFields: Option[List[String]] ) diff --git a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala index efedaf6..d94b734 100644 --- a/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala +++ b/json-log-viewer/shared/src/main/scala/ru/d10xa/jsonlogviewer/formatout/ColorLineFormatter.scala @@ -3,17 +3,21 @@ package ru.d10xa.jsonlogviewer.formatout import fansi.ErrorMode.Strip import fansi.EscapeAttr import fansi.Str +import ru.d10xa.jsonlogviewer.HardcodedFieldNames._ import ru.d10xa.jsonlogviewer.OutputLineFormatter import ru.d10xa.jsonlogviewer.ParseResult import ru.d10xa.jsonlogviewer.decline.Config -class ColorLineFormatter(c: Config, feedName: Option[String]) +class ColorLineFormatter(c: Config, feedName: Option[String], excludeFields: Option[List[String]]) extends OutputLineFormatter: private val strEmpty: Str = Str("") private val strSpace: Str = Str(" ") private val strNewLine: Str = Str("\n") extension (s: String) def ansiStrip: Str = Str(s, Strip) + def shouldExcludeField(fieldName: String): Boolean = + excludeFields.exists(_.contains(fieldName)) + def levelToColor(level: String): EscapeAttr = level match case "DEBUG" => fansi.Color.LightGray @@ -22,12 +26,14 @@ class ColorLineFormatter(c: Config, feedName: Option[String]) case _ => fansi.Color.White def strLevel(levelOpt: Option[String], colorAttr: EscapeAttr): Seq[Str] = - levelOpt match + if (shouldExcludeField(levelFieldName)) Nil + else levelOpt match case Some(level) => strSpace :: colorAttr(s"[${level.ansiStrip}]") :: Nil case None => Nil def strMessage(messageOpt: Option[String], colorAttr: EscapeAttr): Seq[Str] = - messageOpt match + if (shouldExcludeField(messageFieldName)) Nil + else messageOpt match case Some(message) => strSpace :: colorAttr(message.ansiStrip) :: Nil case None => Nil @@ -35,7 +41,8 @@ class ColorLineFormatter(c: Config, feedName: Option[String]) stackTraceOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - stackTraceOpt match + if (shouldExcludeField(stackTraceFieldName)) Nil + else stackTraceOpt match case Some(s) => strNewLine :: colorAttr(s.ansiStrip) :: Nil case None => Nil @@ -43,7 +50,8 @@ class ColorLineFormatter(c: Config, feedName: Option[String]) loggerNameOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - loggerNameOpt match + if (shouldExcludeField(loggerNameFieldName)) Nil + else loggerNameOpt match case Some(loggerName) => strSpace :: colorAttr(loggerName.ansiStrip) :: Nil case None => Nil @@ -52,7 +60,8 @@ class ColorLineFormatter(c: Config, feedName: Option[String]) timestampOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - timestampOpt match + if (shouldExcludeField(c.timestamp.fieldName)) Nil + else timestampOpt match case Some(timestamp) => strSpace :: colorAttr(timestamp.ansiStrip) :: Nil case None => Nil @@ -61,7 +70,8 @@ class ColorLineFormatter(c: Config, feedName: Option[String]) threadNameOpt: Option[String], colorAttr: EscapeAttr ): Seq[Str] = - threadNameOpt match + if (shouldExcludeField(threadNameFieldName)) Nil + else threadNameOpt match case Some(threadName) => strSpace :: colorAttr(s"[${threadName.ansiStrip}]") :: Nil case None => Nil @@ -70,7 +80,11 @@ class ColorLineFormatter(c: Config, feedName: Option[String]) otherAttributes: Map[String, String], needNewLine: Boolean ): Seq[Str] = - otherAttributes match + val filteredAttributes = otherAttributes.filterNot { case (key, _) => + shouldExcludeField(key) + } + + filteredAttributes match case m if m.isEmpty => Nil case m => val s = fansi.Str.join( @@ -88,19 +102,22 @@ class ColorLineFormatter(c: Config, feedName: Option[String]) (if (needNewLine) strNewLine else strEmpty) :: s :: Nil def strPrefix(s: Option[String]): Seq[Str] = - s match + if (shouldExcludeField("prefix")) Nil + else s match case Some(prefix) => fansi.Color.White(prefix.ansiStrip) :: strSpace :: Nil case None => Nil def strFeedName(s: Option[String]): Seq[Str] = - s match + if (shouldExcludeField("feed_name")) Nil + else s match case Some(feedName) => fansi.Color.White(feedName.ansiStrip) :: strSpace :: Nil case None => Nil def strPostfix(s: Option[String]): Seq[Str] = - s match + if (shouldExcludeField("postfix")) Nil + else s match case Some(postfix) => strSpace :: fansi.Color.White(postfix.ansiStrip) :: Nil case None => Nil From f835c9f1ada8acf726f219cb3f3e42efe32d35df Mon Sep 17 00:00:00 2001 From: Andrey Stolyarov Date: Wed, 19 Mar 2025 00:25:31 +0300 Subject: [PATCH 4/4] Fix frontend --- .../src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala index 7a53518..d8947b2 100644 --- a/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala +++ b/frontend-laminar/src/main/scala/ru/d10xa/jsonlogviewer/ViewElement.scala @@ -35,7 +35,8 @@ object ViewElement { filter = config.filter, formatIn = config.formatIn, rawInclude = None, - rawExclude = None + rawExclude = None, + excludeFields = None ) ) )