Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resolvers +=
"Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots"

lazy val jenaV = "5.3.0"
lazy val jellyV = "2.10.3"
lazy val jellyV = "3.0.0"

addCommandAlias("fixAll", "scalafixAll; scalafmtAll")

Expand Down Expand Up @@ -35,7 +35,8 @@ lazy val root = (project in file("."))
"org.slf4j" % "slf4j-simple" % "2.0.17",
"org.apache.jena" % "jena-core" % jenaV,
"org.apache.jena" % "jena-arq" % jenaV,
"eu.ostrzyciel.jelly" %% "jelly-jena" % jellyV,
"eu.neverblink.jelly" % "jelly-jena" % jellyV,
"eu.neverblink.jelly" % "jelly-core-protos-google" % jellyV,
"com.github.alexarchambault" %% "case-app" % "2.1.0-M30",
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"org.yaml" % "snakeyaml" % "2.4" % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object Version extends JellyCommand[VersionOptions]:
val jenaV = BuildInfo.libraryDependencies
.find(_.startsWith("org.apache.jena:jena-core:")).get.split(":")(2)
val jellyV = BuildInfo.libraryDependencies
.find(_.startsWith("eu.ostrzyciel.jelly:jelly-jena:")).get.split(":")(2)
.find(_.startsWith("eu.neverblink.jelly:jelly-jena:")).get.split(":")(2)
printLine(f"""
|jelly-cli ${BuildInfo.version}
|----------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.*
import eu.neverblink.jelly.cli.util.args.IndexRange
import eu.ostrzyciel.jelly.convert.jena.JenaConverterFactory
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import org.apache.jena.graph.Triple
import eu.neverblink.jelly.convert.jena.JenaConverterFactory
import eu.neverblink.jelly.core.JellyOptions
import eu.neverblink.jelly.core.RdfHandler.AnyStatementHandler
import eu.neverblink.jelly.core.proto.v1.RdfStreamFrame
import eu.neverblink.jelly.core.proto.google.v1 as google
import org.apache.jena.graph.{Node, Triple}
import org.apache.jena.riot.Lang
import org.apache.jena.riot.system.StreamRDFWriter
import org.apache.jena.sparql.core.Quad

import java.io.{InputStream, OutputStream}
import scala.jdk.CollectionConverters.*

object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]:
override val defaultFormat: RdfFormat = RdfFormat.NQuads
Expand Down Expand Up @@ -89,30 +93,40 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
val writer = StreamRDFWriter.getWriterStream(outputStream, jenaLang)
// Whether the output is active at this moment
var outputEnabled = false
val decoder = JenaConverterFactory.anyStatementDecoder(
val handler = new AnyStatementHandler[Node] {
override def handleNamespace(prefix: String, namespace: Node): Unit = {
if outputEnabled then writer.prefix(prefix, namespace.getURI)
}

override def handleTriple(subject: Node, predicate: Node, `object`: Node): Unit = {
if outputEnabled then writer.triple(Triple.create(subject, predicate, `object`))
}

override def handleQuad(subject: Node, predicate: Node, `object`: Node, graph: Node): Unit = {
if outputEnabled then writer.quad(Quad.create(graph, subject, predicate, `object`))
}
}

val decoder = JenaConverterFactory.getInstance().anyStatementDecoder(
// Only pass on the namespaces to the writer if the output is enabled
namespaceHandler = (String, Node) => {
if outputEnabled then writer.prefix(String, Node.getURI)
},
handler,
JellyOptions.DEFAULT_SUPPORTED_OPTIONS,
)

val inputFrames = takeFrames.end match
case Some(end) => JellyUtil.iterateRdfStream(inputStream).take(end)
case None => JellyUtil.iterateRdfStream(inputStream)
val startFrom = takeFrames.start.getOrElse(0)
for (frame, i) <- inputFrames.zipWithIndex do
// If we are not yet in the output range, still fully parse the frame and update the decoder
// state. We need this to decode the later frames correctly.
if i < startFrom then for row <- frame.rows do decoder.ingestRowFlat(row)
if i < startFrom then for row <- frame.getRows.asScala do decoder.ingestRow(row)
else
// TODO: write frame index as a comment here
// https://github.com/Jelly-RDF/cli/issues/4
outputEnabled = true
// We are in the output range, so we can start writing the output
for row <- frame.rows do
decoder.ingestRowFlat(row) match
case null => ()
case t: Triple => writer.triple(t)
case q: Quad => writer.quad(q)
for row <- frame.getRows.asScala do decoder.ingestRow(row)
writer.finish()

/** This method reads the Jelly file, rewrites it to Jelly text and writes it to some output
Expand All @@ -128,7 +142,7 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
// we want to write a comment to the file before each frame
val comment = f"# Frame $frameIndex\n"
outputStream.write(comment.getBytes)
val frame = f.toProtoString
val frame = google.RdfStreamFrame.parseFrom(f.toByteArray).toString
// the protoString is basically the jelly-txt format already
outputStream.write(frame.getBytes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import caseapp.{ArgsName, ExtraName, HelpMessage, Recurse}
import caseapp.core.RemainingArgs
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.{FrameInfo, JellyUtil, MetricsPrinter}
import eu.ostrzyciel.jelly.core.proto.v1.*
import eu.neverblink.jelly.core.proto.v1.*

import scala.jdk.CollectionConverters.*

import java.io.InputStream
@HelpMessage(
Expand Down Expand Up @@ -57,8 +59,11 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
frame: RdfStreamFrame,
frameIndex: Int,
): FrameInfo =
val metrics = new FrameInfo(frameIndex, frame.metadata)
frame.rows.foreach(r => metricsForRow(r, metrics))
val metrics = new FrameInfo(
frameIndex,
frame.getMetadata.asScala.map(entry => entry.getKey -> entry.getValue).toMap,
)
frame.getRows.asScala.foreach(r => metricsForRow(r, metrics))
metrics

try {
Expand All @@ -80,7 +85,7 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
row: RdfStreamRow,
metadata: FrameInfo,
): Unit =
row.row match {
row.getRow match {
case r: RdfTriple => metadata.tripleCount += 1
case r: RdfQuad => metadata.quadCount += 1
case r: RdfNameEntry => metadata.nameCount += 1
Expand All @@ -102,9 +107,10 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
*/
private def checkOptions(headFrame: Option[RdfStreamFrame]): RdfStreamOptions =
if headFrame.isEmpty then throw new RuntimeException("No frames in the stream.")
if headFrame.get.rows.isEmpty then throw new RuntimeException("No rows in the frame.")
val frameRows = headFrame.get.rows
frameRows.head.row match {
if headFrame.get.getRows.asScala.isEmpty then
throw new RuntimeException("No rows in the frame.")
val frameRows = headFrame.get.getRows.asScala
frameRows.head.getRow match {
case r: RdfStreamOptions => r
case _ => throw new RuntimeException("First row of the frame is not an options row.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import caseapp.*
import eu.neverblink.jelly.cli.command.rdf.util.{RdfCommandPrintUtil, RdfFormat}

import scala.reflect.TypeTest
import eu.ostrzyciel.jelly.core.{RdfProtoDeserializationError, RdfProtoSerializationError}
import eu.neverblink.jelly.core.{RdfProtoDeserializationError, RdfProtoSerializationError}

import java.io.{InputStream, OutputStream}

Expand Down
54 changes: 30 additions & 24 deletions src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJelly.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package eu.neverblink.jelly.cli.command.rdf

import caseapp.*
import com.google.protobuf.TextFormat
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.*
import eu.neverblink.jelly.cli.util.jena.riot.JellyStreamWriterGraphs
import eu.ostrzyciel.jelly.convert.jena.riot.{JellyFormatVariant, JellyLanguage, JellyStreamWriter}
import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamFrame, RdfStreamOptions}
import eu.neverblink.jelly.convert.jena.JenaConverterFactory
import eu.neverblink.jelly.convert.jena.riot.{JellyFormatVariant, JellyLanguage, JellyStreamWriter}
import eu.neverblink.jelly.core.proto.google.v1 as google
import eu.neverblink.jelly.core.proto.v1.{LogicalStreamType, PhysicalStreamType, RdfStreamOptions}
import org.apache.jena.riot.system.StreamRDFWriter
import org.apache.jena.riot.{Lang, RDFParser, RIOT}

Expand Down Expand Up @@ -102,25 +105,27 @@ object RdfToJelly extends RdfSerDesCommand[RdfToJellyOptions, RdfFormat.Readable
val jellyOpt = getOptions.jellySerializationOptions.asRdfStreamOptions
// Configure the writer
val jellyWriter =
if jellyOpt.physicalType.isGraphs then
if jellyOpt.getPhysicalType == PhysicalStreamType.GRAPHS then
// GRAPHS
JellyStreamWriterGraphs(
JellyFormatVariant(
// By default, set the logical stream type to FLAT_QUADS (this is what JellyStreamWriter
// in jelly-jena does for physical type QUADS).
opt = jellyOpt.withLogicalType(
if jellyOpt.logicalType.isUnspecified then LogicalStreamType.FLAT_QUADS
else jellyOpt.logicalType,
),
frameSize = getOptions.rowsPerFrame,
enableNamespaceDeclarations = getOptions.enableNamespaceDeclarations,
delimited = getOptions.delimited,
),
JellyFormatVariant
.builder()
.options(
jellyOpt.clone.setLogicalType(
if jellyOpt.getLogicalType == LogicalStreamType.UNSPECIFIED then
LogicalStreamType.FLAT_QUADS
else jellyOpt.getLogicalType,
),
)
.frameSize(getOptions.rowsPerFrame)
.enableNamespaceDeclarations(getOptions.enableNamespaceDeclarations)
.isDelimited(getOptions.delimited)
.build(),
out = outputStream,
)
else
// TRIPLES or QUADS
if jellyOpt.physicalType.isUnspecified then
if jellyOpt.getPhysicalType == PhysicalStreamType.UNSPECIFIED then
if !isQuietMode && isLogicalGrouped(jellyOpt) then
printLine(
"WARNING: Logical type setting ignored because physical type is not set. " +
Expand All @@ -145,13 +150,14 @@ object RdfToJelly extends RdfSerDesCommand[RdfToJellyOptions, RdfFormat.Readable
)
else
// If the physical type is specified, we can just construct the writer
val variant = JellyFormatVariant(
opt = jellyOpt,
frameSize = getOptions.rowsPerFrame,
enableNamespaceDeclarations = getOptions.enableNamespaceDeclarations,
delimited = getOptions.delimited,
)
JellyStreamWriter(variant, outputStream)
val variant = JellyFormatVariant
.builder()
.options(jellyOpt)
.frameSize(getOptions.rowsPerFrame)
.enableNamespaceDeclarations(getOptions.enableNamespaceDeclarations)
.isDelimited(getOptions.delimited)
.build()
JellyStreamWriter(JenaConverterFactory.getInstance(), variant, outputStream)

RDFParser.source(inputStream).lang(jenaLang).parse(jellyWriter)

Expand All @@ -172,7 +178,7 @@ object RdfToJelly extends RdfSerDesCommand[RdfToJellyOptions, RdfFormat.Readable
Using.resource(InputStreamReader(inputStream)) { r1 =>
Using.resource(BufferedReader(r1)) { reader =>
jellyTextStreamAsFrames(reader)
.map(txt => RdfStreamFrame.fromAscii(txt))
.map(txt => TextFormat.parse(txt, classOf[google.RdfStreamFrame]))
.foreach(frame => {
if getOptions.delimited then frame.writeDelimitedTo(outputStream)
else frame.writeTo(outputStream)
Expand All @@ -189,7 +195,7 @@ object RdfToJelly extends RdfSerDesCommand[RdfToJellyOptions, RdfFormat.Readable
private def isLogicalGrouped(
jellyOpt: RdfStreamOptions,
): Boolean =
!(jellyOpt.logicalType.isFlatQuads || jellyOpt.logicalType.isFlatTriples || jellyOpt.logicalType.isUnspecified)
!(jellyOpt.getLogicalType == LogicalStreamType.FLAT_QUADS || jellyOpt.getLogicalType == LogicalStreamType.FLAT_TRIPLES || jellyOpt.getLogicalType == LogicalStreamType.UNSPECIFIED)

/** Iterate over a Jelly text stream and return the frames as strings to be parsed.
* @param reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package eu.neverblink.jelly.cli.command.rdf
import caseapp.*
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.ostrzyciel.jelly.core.RdfProtoError
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions
import eu.ostrzyciel.jelly.core.{JellyOptions, ProtoTranscoder}
import eu.neverblink.jelly.core.proto.v1.{LogicalStreamType, RdfStreamOptions}
import eu.neverblink.jelly.core.{JellyOptions, JellyTranscoderFactory, RdfProtoTranscodingError}

import java.io.{InputStream, OutputStream}
import scala.jdk.CollectionConverters.*

@HelpMessage(
"Quickly transcodes the input Jelly file into another Jelly file.\n" +
Expand Down Expand Up @@ -44,7 +44,7 @@ object RdfTranscode extends JellyCommand[RdfTranscodeOptions]:
val (inputStream, outputStream) =
getIoStreamsFromOptions(remainingArgs.remaining.headOption, options.outputFile)
try jellyToJelly(inputStream, outputStream, outOpt)
catch case e: RdfProtoError => throw JellyTranscodingError(e.getMessage)
catch case e: RdfProtoTranscodingError => throw JellyTranscodingError(e.getMessage)

/** Transcodes the input Jelly stream into another Jelly stream.
* @param inputStream
Expand All @@ -61,20 +61,21 @@ object RdfTranscode extends JellyCommand[RdfTranscodeOptions]:
): Unit =
val in = JellyUtil.iterateRdfStream(inputStream).buffered
val head = in.head
if head.rows.isEmpty then throw CriticalException("Empty input stream")
if !head.rows.head.row.isOptions then
if head.getRows.asScala.isEmpty then throw CriticalException("Empty input stream")
if !head.getRows.asScala.head.hasOptions then
throw CriticalException("First input row is not an options row")
val inOpt = head.rows.head.row.options
val inOpt = head.getRows.asScala.head.getOptions

val transcoder = ProtoTranscoder.fastMergingTranscoder(
supportedInputOptions = JellyOptions.defaultSupportedOptions,
outputOptions = outOpt.copy(
val transcoder = JellyTranscoderFactory.fastMergingTranscoder(
JellyOptions.DEFAULT_SUPPORTED_OPTIONS,
outOpt.clone()
// There is no way to specify the physical type with options currently.
// Just use the one from the input.
physicalType = inOpt.physicalType,
logicalType =
if outOpt.logicalType.isUnspecified then inOpt.logicalType else outOpt.logicalType,
),
.setPhysicalType(inOpt.getPhysicalType)
.setLogicalType(
if outOpt.getLogicalType == LogicalStreamType.UNSPECIFIED then inOpt.getLogicalType
else outOpt.getLogicalType,
),
)

in.map(transcoder.ingestFrame)
Expand Down
Loading