Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy val root = (project in file("."))
"eu.ostrzyciel.jelly" %% "jelly-jena" % jellyV,
"com.github.alexarchambault" %% "case-app" % "2.1.0-M30",
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"org.yaml" % "snakeyaml" % "2.4" % Test,
),
scalacOptions ++= Seq(
"-Wunused:imports",
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/eu/neverblink/jelly/cli/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ object App extends CommandsEntryPoint:
Version,
RdfFromJelly,
RdfToJelly,
RdfInspect,
)
3 changes: 1 addition & 2 deletions src/main/scala/eu/neverblink/jelly/cli/Exceptions.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package eu.neverblink.jelly.cli

import com.google.protobuf.InvalidProtocolBufferException
import org.apache.jena.riot.RiotException

/** Contains a set of common jelly-cli exceptions with custom output messages.
Expand All @@ -22,7 +21,7 @@ case class JellyTranscodingError(message: String)
extends CriticalException(s"Jelly transcoding error: $message")
case class JenaRiotException(e: RiotException)
extends CriticalException(s"Jena RDF I/O exception: ${e.getMessage}")
case class InvalidJellyFile(e: InvalidProtocolBufferException)
case class InvalidJellyFile(e: Exception)
extends CriticalException(s"Invalid Jelly file: ${e.getMessage}")
case class InvalidFormatSpecified(format: String, validFormats: String)
extends CriticalException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import caseapp.*
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.*
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.Jena.*
import eu.neverblink.jelly.cli.util.JellyUtil
import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import eu.ostrzyciel.jelly.core.IoUtils
import org.apache.jena.riot.system.StreamRDFWriter
import org.apache.jena.riot.{Lang, RDFParser}

Expand All @@ -25,7 +25,7 @@ case class RdfFromJellyOptions(
@ExtraName("out-format") outputFormat: Option[String] = None,
) extends HasJellyCommandOptions

object RdfFromJelly extends RdfCommand[RdfFromJellyOptions, RdfFormat.Writeable]:
object RdfFromJelly extends RdfTranscodeCommand[RdfFromJellyOptions, RdfFormat.Writeable]:

override def names: List[List[String]] = List(
List("rdf", "from-jelly"),
Expand Down Expand Up @@ -83,30 +83,10 @@ object RdfFromJelly extends RdfCommand[RdfFromJellyOptions, RdfFormat.Writeable]
outputStream.write(frame.getBytes)

try {
iterateRdfStream(inputStream, outputStream).zipWithIndex.foreach {
JellyUtil.iterateRdfStream(inputStream).zipWithIndex.foreach {
case (maybeFrame, frameIndex) =>
writeFrameToOutput(maybeFrame, frameIndex)
}
} finally {
outputStream.flush()
}

/** This method reads the Jelly file and returns an iterator of RdfStreamFrame
* @param inputStream
* @param outputStream
* @return
*/
private def iterateRdfStream(
inputStream: InputStream,
outputStream: OutputStream,
): Iterator[RdfStreamFrame] =
IoUtils.autodetectDelimiting(inputStream) match
case (false, newIn) =>
// Non-delimited Jelly file
// In this case, we can only read one frame
Iterator(RdfStreamFrame.parseFrom(newIn))
case (true, newIn) =>
// Delimited Jelly file
// In this case, we can read multiple frames
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined).map(_.get)
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package eu.neverblink.jelly.cli.command.rdf

import caseapp.{ExtraName, Recurse}
import caseapp.core.RemainingArgs
import eu.neverblink.jelly.cli.util.{FrameInfo, JellyUtil, MetricsPrinter}
import eu.neverblink.jelly.cli.*
import eu.ostrzyciel.jelly.core.proto.v1.*

import java.io.InputStream

case class RdfInspectOptions(
@Recurse
common: JellyCommandOptions = JellyCommandOptions(),
@ExtraName("to") outputFile: Option[String] = None,
@ExtraName("per-frame") perFrame: Boolean = false,
) extends HasJellyCommandOptions

object RdfInspect extends JellyCommand[RdfInspectOptions]:

override def names: List[List[String]] = List(
List("rdf", "inspect"),
)

override final def group = "rdf"

override def doRun(options: RdfInspectOptions, remainingArgs: RemainingArgs): Unit =
val (inputStream, outputStream) =
this.getIoStreamsFromOptions(remainingArgs.remaining.headOption, options.outputFile)
val (streamOpts, frameIterator) = inspectJelly(inputStream)
if options.perFrame then MetricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream)
else MetricsPrinter.printAggregate(streamOpts, frameIterator, outputStream)

private def inspectJelly(
inputStream: InputStream,
): (RdfStreamOptions, Iterator[FrameInfo]) =

inline def computeMetrics(
frame: RdfStreamFrame,
frameIndex: Int,
): FrameInfo =
val metrics = new FrameInfo(frameIndex)
frame.rows.foreach(r => metricsForRow(r, metrics))
metrics

try {
val allRows = JellyUtil.iterateRdfStream(inputStream).buffered
// we need to check if the first frame contains options
val streamOptions = checkOptions(allRows.headOption)
// We compute the metrics for each frame
// and then sum them all during the printing if desired
val frameIterator = allRows.zipWithIndex.map { case (maybeFrame, frameIndex) =>
computeMetrics(maybeFrame, frameIndex)
}
(streamOptions, frameIterator)
} catch {
case e: Exception =>
throw InvalidJellyFile(e)
}

private def metricsForRow(
row: RdfStreamRow,
metadata: FrameInfo,
): Unit =
row.row match {
case r: RdfTriple => metadata.tripleCount += 1
case r: RdfQuad => metadata.quadCount += 1
case r: RdfNameEntry => metadata.nameCount += 1
case r: RdfPrefixEntry => metadata.prefixCount += 1
case r: RdfNamespaceDeclaration => metadata.namespaceCount += 1
case r: RdfDatatypeEntry => metadata.datatypeCount += 1
case r: RdfGraphStart => metadata.graphStartCount += 1
case r: RdfGraphEnd => metadata.graphEndCount += 1
case r: RdfStreamOptions => metadata.optionCount += 1
}

/** Checks whether the first frame in the stream contains options and returns them.
* @param headFrame
* The first frame in the stream as an option.
* @return
* The options from the first frame.
* @throws RuntimeException
* If the first frame does not contain options or if there are no frames in the stream.
*/
private def checkOptions(headFrame: Option[RdfStreamFrame]): RdfStreamOptions =
if headFrame.isEmpty then throw new RuntimeException("No frames in the stream.")
if headFrame.get.rows.isEmpty then throw new RuntimeException("No rows in the frame.")
val frameRows = headFrame.get.rows
frameRows.head.row match {
case r: RdfStreamOptions => r
case _ => throw new RuntimeException("First row of the frame is not an options row.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class RdfToJellyOptions(
delimited: Boolean = true,
) extends HasJellyCommandOptions

object RdfToJelly extends RdfCommand[RdfToJellyOptions, RdfFormat.Readable]:
object RdfToJelly extends RdfTranscodeCommand[RdfToJellyOptions, RdfFormat.Readable]:

override def names: List[List[String]] = List(
List("rdf", "to-jelly"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import java.io.{InputStream, OutputStream}

/** This abstract class is responsible for the common logic in both RDF parsing commands
*/
abstract class RdfCommand[T <: HasJellyCommandOptions: {Parser, Help}, F <: RdfFormat](using
tt: TypeTest[RdfFormat, F],
abstract class RdfTranscodeCommand[T <: HasJellyCommandOptions: {Parser, Help}, F <: RdfFormat](
using tt: TypeTest[RdfFormat, F],
) extends JellyCommand[T]:

override final def group = "rdf"
Expand Down
27 changes: 27 additions & 0 deletions src/main/scala/eu/neverblink/jelly/cli/util/JellyUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package eu.neverblink.jelly.cli.util

import eu.ostrzyciel.jelly.core.IoUtils
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame

import java.io.InputStream

object JellyUtil:
/** This method reads the Jelly file and returns an iterator of RdfStreamFrame
*
* @param inputStream
* @param outputStream
* @return
*/
def iterateRdfStream(
inputStream: InputStream,
): Iterator[RdfStreamFrame] =
IoUtils.autodetectDelimiting(inputStream) match
case (false, newIn) =>
// Non-delimited Jelly file
// In this case, we can only read one frame
Iterator(RdfStreamFrame.parseFrom(newIn))
case (true, newIn) =>
// Delimited Jelly file
// In this case, we can read multiple frames
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined).map(_.get)
131 changes: 131 additions & 0 deletions src/main/scala/eu/neverblink/jelly/cli/util/MetricsPrinter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package eu.neverblink.jelly.cli.util

import eu.neverblink.jelly.cli.util.YamlDocBuilder.*
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions

import java.io.OutputStream

/** This class is used to store the metrics for a single frame
*/
final class FrameInfo(val frameIndex: Int):
var frameCount: Int = 1
var optionCount: Int = 0
var nameCount: Int = 0
var namespaceCount: Int = 0
var tripleCount: Int = 0
var quadCount: Int = 0
var prefixCount: Int = 0
var datatypeCount: Int = 0
var graphStartCount: Int = 0
var graphEndCount: Int = 0

def +=(other: FrameInfo): FrameInfo = {
this.frameCount += 1
this.optionCount += other.optionCount
this.nameCount += other.nameCount
this.namespaceCount += other.namespaceCount
this.tripleCount += other.tripleCount
this.quadCount += other.quadCount
this.prefixCount += other.prefixCount
this.datatypeCount += other.datatypeCount
this.graphStartCount += other.graphStartCount
this.graphEndCount += other.graphEndCount
this
}

end FrameInfo

object MetricsPrinter:

def printPerFrame(
options: RdfStreamOptions,
iterator: Iterator[FrameInfo],
o: OutputStream,
): Unit =
printOptions(options, o)
val builder =
YamlDocBuilder.build(
YamlMap(
"frames" -> YamlBlank(),
),
)
val fullString = builder.getString
o.write(fullString.getBytes)
iterator.foreach { frame =>
val yamlFrame = YamlListElem(formatStatsIndex(frame))
val fullString = YamlDocBuilder.build(yamlFrame, builder.currIndent).getString
o.write(fullString.getBytes)
o.write(System.lineSeparator().getBytes)
}

def printAggregate(
options: RdfStreamOptions,
iterator: Iterator[FrameInfo],
o: OutputStream,
): Unit = {
printOptions(options, o)
val sumCounts = iterator.reduce((a, b) => a += b)
val fullString =
YamlDocBuilder.build(
YamlMap(
"frames" -> formatStatsCount(sumCounts),
),
).getString
o.write(fullString.getBytes)
}

private def printOptions(
printOptions: RdfStreamOptions,
o: OutputStream,
): Unit =
val options = formatOptions(options = printOptions)
val fullString =
YamlDocBuilder.build(
YamlMap(
"stream_options" -> options,
),
).getString
o.write(fullString.getBytes)
o.write(System.lineSeparator().getBytes)

private def formatOptions(
options: RdfStreamOptions,
): YamlMap =
YamlMap(
"stream_name" -> YamlString(options.streamName),
"physical_type" -> YamlEnum(options.physicalType.toString, options.physicalType.value),
"generalized_statements" -> YamlBool(options.generalizedStatements),
"rdf_star" -> YamlBool(options.rdfStar),
"max_name_table_size" -> YamlInt(options.maxNameTableSize),
"max_prefix_table_size" -> YamlInt(options.maxPrefixTableSize),
"max_datatype_table_size" -> YamlInt(options.maxDatatypeTableSize),
"logical_type" -> YamlEnum(options.logicalType.toString, options.logicalType.value),
"version" -> YamlInt(options.version),
)

private def formatStatsIndex(
frame: FrameInfo,
): YamlMap =
YamlMap(Seq(("frame_index", YamlInt(frame.frameIndex))) ++ formatStats(frame)*)

private def formatStatsCount(
frame: FrameInfo,
): YamlMap =
YamlMap(Seq(("frame_count", YamlInt(frame.frameCount))) ++ formatStats(frame)*)

private def formatStats(
frame: FrameInfo,
): Seq[(String, YamlValue)] =
Seq(
("option_count", YamlInt(frame.optionCount)),
("triple_count", YamlInt(frame.tripleCount)),
("quad_count", YamlInt(frame.quadCount)),
("graph_start_count", YamlInt(frame.graphStartCount)),
("graph_end_count", YamlInt(frame.graphEndCount)),
("namespace_count", YamlInt(frame.namespaceCount)),
("name_count", YamlInt(frame.nameCount)),
("prefix_count", YamlInt(frame.prefixCount)),
("datatype_count", YamlInt(frame.datatypeCount)),
)

end MetricsPrinter
Loading