Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resolvers +=
"Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"

lazy val jenaV = "5.3.0"
lazy val jellyV = "2.9.1+8-58db074b-SNAPSHOT"
lazy val jellyV = "2.9.1+10-e92cafe2-SNAPSHOT"

addCommandAlias("fixAll", "scalafixAll; scalafmtAll")

Expand Down
1 change: 1 addition & 0 deletions src/main/scala/eu/neverblink/jelly/cli/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ object App extends CommandsEntryPoint:
RdfFromJelly,
RdfToJelly,
RdfInspect,
RdfValidate,
)
4 changes: 1 addition & 3 deletions src/main/scala/eu/neverblink/jelly/cli/JellyCommand.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package eu.neverblink.jelly.cli

import caseapp.*
import eu.neverblink.jelly.cli.util.IoUtil
import eu.neverblink.jelly.cli.util.io.IoUtil

import java.io.*
import scala.compiletime.uninitialized
Expand Down Expand Up @@ -120,8 +120,6 @@ abstract class JellyCommand[T <: HasJellyCommandOptions: {Parser, Help}] extends
else System.in

final def setStdIn(data: ByteArrayInputStream): Unit =
validateTestMode()
in.reset()
in = data

final def getOutStream: OutputStream =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package eu.neverblink.jelly.cli.command.rdf

import caseapp.*
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.*
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.Jena.*
import eu.neverblink.jelly.cli.util.JellyUtil
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.Jena.*
import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import org.apache.jena.riot.system.StreamRDFWriter
Expand Down Expand Up @@ -42,9 +43,9 @@ object RdfFromJelly extends RdfTranscodeCommand[RdfFromJellyOptions, RdfFormat.W
parseFormatArgs(inputStream, outputStream, options.outputFormat, options.outputFile)

override def matchFormatToAction(
option: RdfFormat.Writeable,
format: RdfFormat.Writeable,
): Option[(InputStream, OutputStream) => Unit] =
option match
format match
case j: RdfFormat.Jena.Writeable => Some(jellyToLang(j.jenaLang, _, _))
case RdfFormat.JellyText => Some(jellyBinaryToText)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package eu.neverblink.jelly.cli.command.rdf

import caseapp.{ExtraName, Recurse}
import caseapp.core.RemainingArgs
import eu.neverblink.jelly.cli.util.{FrameInfo, JellyUtil, MetricsPrinter}
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.{FrameInfo, JellyUtil, MetricsPrinter}
import eu.ostrzyciel.jelly.core.proto.v1.*

import java.io.InputStream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package eu.neverblink.jelly.cli.command.rdf

import caseapp.*
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.*
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.*
import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import org.apache.jena.riot.system.StreamRDFWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.google.protobuf.InvalidProtocolBufferException
import org.apache.jena.riot.RiotException
import eu.neverblink.jelly.cli.*
import caseapp.*
import eu.neverblink.jelly.cli.command.rdf.util.{RdfCommandPrintUtil, RdfFormat}

import scala.reflect.TypeTest
import eu.ostrzyciel.jelly.core.{RdfProtoDeserializationError, RdfProtoSerializationError}
Expand All @@ -25,7 +26,7 @@ abstract class RdfTranscodeCommand[T <: HasJellyCommandOptions: {Parser, Help},
lazy val printUtil: RdfCommandPrintUtil[F]

/** The method responsible for matching the format to a given action */
def matchFormatToAction(option: F): Option[(InputStream, OutputStream) => Unit]
def matchFormatToAction(format: F): Option[(InputStream, OutputStream) => Unit]

/** This method takes care of proper error handling and takes care of the parameter priorities in
* matching the input to a given format conversion
Expand All @@ -42,7 +43,7 @@ abstract class RdfTranscodeCommand[T <: HasJellyCommandOptions: {Parser, Help},
* @throws JenaRiotException
* @throws InvalidJellyFile
*/
def parseFormatArgs(
final def parseFormatArgs(
inputStream: InputStream,
outputStream: OutputStream,
format: Option[String],
Expand Down
216 changes: 216 additions & 0 deletions src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfValidate.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package eu.neverblink.jelly.cli.command.rdf

import caseapp.*
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.cli.util.args.IndexRange
import eu.neverblink.jelly.cli.util.io.IoUtil
import eu.neverblink.jelly.cli.util.jena.*
import eu.ostrzyciel.jelly.convert.jena.JenaConverterFactory
import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import org.apache.jena.graph.Triple
import org.apache.jena.riot.RDFParser
import org.apache.jena.riot.system.StreamRDFLib
import org.apache.jena.sparql.core.Quad

import scala.util.Using

object RdfValidatePrint extends RdfCommandPrintUtil[RdfFormat.Jena]:
override val defaultFormat: RdfFormat = RdfFormat.NQuads

@HelpMessage(
"Validates a Jelly-RDF stream.\nIf no additional options are specified, " +
"only basic validations are performed. You can also validate the stream against " +
"a reference RDF file, check the stream options, and its delimiting.\n" +
"If an error is detected, the program will exit with a non-zero code.\n" +
"Otherwise, the program will exit with code 0.\n" +
"Note: this command does not work in a streaming manner. If you try to validate a very large " +
"file, you may run out of memory.",
)
@ArgsName("<file-to-validate>")
case class RdfValidateOptions(
@Recurse
common: JellyCommandOptions = JellyCommandOptions(),
@HelpMessage(
"RDF file to compare the input stream to. If not specified, no comparison is done.",
)
compareToRdfFile: Option[String] = None,
@HelpMessage(
"Format of the RDF file to compare the input stream to. If not specified, the format is " +
"inferred from the file name.",
)
compareToFormat: Option[String] = None,
@HelpMessage(
"Whether the comparison should be ordered (statements must come in a specific order) or " +
"unordered (RDF dataset isomorphism). Default: false (unordered)",
)
compareOrdered: Boolean = false,
@HelpMessage(
"Frame indices to compare. If not specified, all frames are compared. " +
"The indices are 0-based and can be specified as a Rust-style range: " +
"'..3', '3..', '1..5', '4..=6'",
)
compareFrameIndices: String = "",
@HelpMessage(
"File with the expected stream options. If not specified, the options are not checked.",
)
optionsFile: Option[String] = None,
@HelpMessage(
"Whether the input stream should be checked to be delimited or undelimited. " +
"Possible values: 'either', 'true', 'false'. Default: 'either'.",
)
delimited: String = "either",
) extends HasJellyCommandOptions

object RdfValidate extends JellyCommand[RdfValidateOptions]:
private enum Delimiting:
case Either, Delimited, Undelimited

override def names: List[List[String]] = List(List("rdf", "validate"))

override def group = "rdf"

override def doRun(options: RdfValidateOptions, remainingArgs: RemainingArgs): Unit =
// Parse input options
val frameIndices = IndexRange(options.compareFrameIndices, "--compare-frame-indices")
val delimiting = options.delimited match
case "" | "either" => Delimiting.Either
case "true" => Delimiting.Delimited
case "false" => Delimiting.Undelimited
case _ =>
throw InvalidArgument(
"--delimited",
options.delimited,
Some("Valid values: true, false, either"),
)
val rdfComparison =
options.compareToRdfFile.map(n => getRdfForComparison(n, options.compareToFormat))
val (inputStream, _) = getIoStreamsFromOptions(remainingArgs.remaining.headOption, None)
val (delimited, frameIterator) = JellyUtil.iterateRdfStreamWithDelimitingInfo(inputStream)

// Step 1: Validate delimiting
validateDelimiting(delimiting, delimited)
// Step 2: Validate basic stream structure & the stream options
val framesSeq = frameIterator.toSeq
validateOptions(framesSeq)
// Step 3: Validate the content
validateContent(framesSeq, frameIndices, rdfComparison)

private def validateDelimiting(
expected: Delimiting,
delimited: Boolean,
): Unit = expected match
case Delimiting.Either => ()
case Delimiting.Delimited =>
if !delimited then
throw CriticalException("Expected delimited input, but the file was not delimited")
case Delimiting.Undelimited =>
if delimited then
throw CriticalException("Expected undelimited input, but the file was delimited")

private def validateOptions(frames: Seq[RdfStreamFrame]): Unit =
// Validate basic stream structure
if frames.isEmpty then throw CriticalException("Empty input stream")
if frames.head.rows.isEmpty then
throw CriticalException("First frame in the input stream is empty")
if !frames.head.rows.head.row.isOptions then
throw CriticalException("First row in the input stream does not contain stream options")
val streamOptions = frames.head.rows.head.row.options
// If we have expected options, we need to read and validate them
val expectedOptions = getOptions.optionsFile.map { optionsFileName =>
val o = Using.resource(IoUtil.inputStream(optionsFileName)) { is =>
JellyUtil.iterateRdfStream(is).next().rows.head.row.options
}
if streamOptions != o then
throw CriticalException(
s"Stream options do not match the expected options in $optionsFileName\n" +
s"Expected: $o\n" +
s"Actual: $streamOptions",
)
o
}
JellyOptions.checkCompatibility(
streamOptions,
expectedOptions.getOrElse(JellyOptions.defaultSupportedOptions),
)

private def validateContent(
frames: Seq[RdfStreamFrame],
frameIndices: IndexRange,
maybeRdfComparison: Option[StreamRdfCollector],
): Unit =
// Prepare data structures
val jellyStreamConsumer =
if maybeRdfComparison.isDefined then StreamRdfCollector()
else StreamRDFLib.sinkNull()
val opt = frames.head.rows.head.row.options
val dec = JenaConverterFactory.anyStatementDecoder(
None,
(prefix, iri) => jellyStreamConsumer.prefix(prefix, iri.getURI),
)
val x = frameIndices.slice(frames).zipWithIndex
for (frame, i) <- x do
val frameIndex = frameIndices.start.getOrElse(0) + i
for row <- frame.rows do
if row.row.isOptions && row.row.options != opt then
throw CriticalException(
s"Later occurrence of stream options in frame $frameIndex does not match the first",
)
// Push the stream frames through the decoder
// This will catch most of the errors
dec.ingestRowFlat(row) match
case null => ()
// Check if the stream really does not contain any RDF-star or generalized statements
// if it doesn't declare to use them. This is normally not checked by the decoder
// because it's too performance-costly.
case t: Triple =>
if !opt.generalizedStatements && StatementUtils.isGeneralized(t) then
throw CriticalException(s"Unexpected generalized triple in frame $frameIndex: $t")
if !opt.rdfStar && StatementUtils.isRdfStar(t) then
throw CriticalException(s"Unexpected RDF-star triple in frame $frameIndex: $t")
jellyStreamConsumer.triple(t)
case q: Quad =>
if !opt.generalizedStatements && StatementUtils.isGeneralized(q) then
throw CriticalException(s"Unexpected generalized quad in frame $frameIndex: $q")
if !opt.rdfStar && StatementUtils.isRdfStar(q) then
throw CriticalException(s"Unexpected RDF-star quad in frame $frameIndex: $q")
jellyStreamConsumer.quad(q)
// Compare the Jelly data with the reference RDF data, if specified
maybeRdfComparison.foreach { rdfComparison =>
val actual = jellyStreamConsumer.asInstanceOf[StreamRdfCollector]
val comparator =
if getOptions.compareOrdered then OrderedRdfCompare
else UnorderedRdfCompare
comparator.compare(rdfComparison, actual)
}

/** Reads the RDF file for comparison and returns a StreamRdfCollector
* @param fileName
* filename to read
* @param formatName
* optional format name
* @return
*/
private def getRdfForComparison(
fileName: String,
formatName: Option[String],
): StreamRdfCollector =
val explicitFormat = formatName.flatMap(RdfFormat.find)
val implicitFormat = RdfFormat.inferFormat(fileName)
val format = (explicitFormat, implicitFormat) match {
case (Some(f: RdfFormat.Jena), _) => f
case (_, Some(f: RdfFormat.Jena)) => f
case (_, _) =>
throw InvalidFormatSpecified(
formatName.getOrElse(""),
RdfValidatePrint.validFormatsString,
)
}
val output = StreamRdfCollector()
Using.resource(IoUtil.inputStream(fileName)) { is =>
RDFParser.source(is)
.lang(format.jenaLang)
.parse(output)
}
output
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package eu.neverblink.jelly.cli.command.rdf.util

import eu.ostrzyciel.jelly.core.IoUtils
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame

import java.io.InputStream

object JellyUtil:
/** Reads the Jelly file and returns an iterator of RdfStreamFrame
*
* @param inputStream
* @param outputStream
* @return
*/
def iterateRdfStream(
inputStream: InputStream,
): Iterator[RdfStreamFrame] = iterateRdfStreamWithDelimitingInfo(inputStream)._2

/** Reads the Jelly file and returns an iterator of RdfStreamFrame and a boolean indicating if the
* file is delimited or not
* @param inputStream
* @return
*/
def iterateRdfStreamWithDelimitingInfo(
inputStream: InputStream,
): (Boolean, Iterator[RdfStreamFrame]) =
IoUtils.autodetectDelimiting(inputStream) match
case (false, newIn) =>
// Non-delimited Jelly file
// In this case, we can only read one frame
(false, Iterator(RdfStreamFrame.parseFrom(newIn)))
case (true, newIn) =>
// Delimited Jelly file
// In this case, we can read multiple frames
(
true,
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined).map(_.get),
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eu.neverblink.jelly.cli.util
package eu.neverblink.jelly.cli.command.rdf.util

import eu.neverblink.jelly.cli.util.YamlDocBuilder.*
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder.*
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions

import java.io.OutputStream
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.neverblink.jelly.cli.command.rdf
package eu.neverblink.jelly.cli.command.rdf.util

import scala.reflect.TypeTest

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.neverblink.jelly.cli.command.rdf
package eu.neverblink.jelly.cli.command.rdf.util

import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
import org.apache.jena.riot.{Lang, RDFLanguages}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package eu.neverblink.jelly.cli.command.rdf
package eu.neverblink.jelly.cli.command.rdf.util

import caseapp.*
import eu.neverblink.jelly.cli.InvalidArgument
import eu.ostrzyciel.jelly.core.{JellyOptions, LogicalStreamTypeFactory}
import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamOptions}
import eu.ostrzyciel.jelly.core.{JellyOptions, LogicalStreamTypeFactory}

/** Options for serializing in Jelly-RDF */
case class RdfJellySerializationOptions(
Expand Down
Loading