Skip to content

Commit

Permalink
bugfix: Fix concurrency problems in test code (#2653)
Browse files Browse the repository at this point in the history
  • Loading branch information
seakayone committed May 16, 2023
1 parent 7af1c27 commit 423b352
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import scala.util.Try

import org.knora.webapi.IRI
import org.knora.webapi.messages.util.rdf._
import org.knora.webapi.messages.util.rdf.jenaimpl.JenaFormatUtil.rdfFormatToJenaParsingLang

/**
* Wraps an [[RdfStreamProcessor]] in a [[jena.riot.system.StreamRDF]].
Expand Down Expand Up @@ -67,14 +68,6 @@ class JenaFormatUtil(private val modelFactory: JenaModelFactory, private val nod

override def getRdfNodeFactory: RdfNodeFactory = nodeFactory

private def rdfFormatToJenaParsingLang(rdfFormat: NonJsonLD): jena.riot.Lang =
rdfFormat match {
case Turtle => jena.riot.RDFLanguages.TURTLE
case TriG => jena.riot.RDFLanguages.TRIG
case RdfXml => jena.riot.RDFLanguages.RDFXML
case NQuads => jena.riot.RDFLanguages.NQUADS
}

override def parseNonJsonLDToRdfModel(rdfStr: String, rdfFormat: NonJsonLD): RdfModel = {
val jenaModel: JenaModel = modelFactory.makeEmptyModel

Expand Down Expand Up @@ -222,3 +215,13 @@ class JenaFormatUtil(private val modelFactory: JenaModelFactory, private val nod
}
}
}

object JenaFormatUtil {
def rdfFormatToJenaParsingLang(rdfFormat: NonJsonLD): jena.riot.Lang =
rdfFormat match {
case Turtle => jena.riot.RDFLanguages.TURTLE
case TriG => jena.riot.RDFLanguages.TRIG
case RdfXml => jena.riot.RDFLanguages.RDFXML
case NQuads => jena.riot.RDFLanguages.NQUADS
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import java.nio.charset.StandardCharsets
import java.nio.file.Path
import java.nio.file.Paths
import scala.collection.mutable
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.CollectionConverters.IteratorHasAsScala

Expand All @@ -54,7 +53,6 @@ import org.knora.webapi.messages.store.triplestoremessages.SparqlUpdateResponse
import org.knora.webapi.messages.util.rdf.QuadFormat
import org.knora.webapi.messages.util.rdf.RdfFeatureFactory
import org.knora.webapi.messages.util.rdf.RdfFormatUtil
import org.knora.webapi.messages.util.rdf.RdfInputStreamSource
import org.knora.webapi.messages.util.rdf.RdfModel
import org.knora.webapi.messages.util.rdf.RdfStringSource
import org.knora.webapi.messages.util.rdf.SparqlSelectResult
Expand All @@ -63,6 +61,7 @@ import org.knora.webapi.messages.util.rdf.SparqlSelectResultHeader
import org.knora.webapi.messages.util.rdf.Statement
import org.knora.webapi.messages.util.rdf.Turtle
import org.knora.webapi.messages.util.rdf.VariableResultsRow
import org.knora.webapi.messages.util.rdf.jenaimpl.JenaFormatUtil
import org.knora.webapi.store.triplestore.api.TriplestoreServiceInMemory.createEmptyDataset
import org.knora.webapi.store.triplestore.defaults.DefaultRdfData
import org.knora.webapi.store.triplestore.errors.TriplestoreException
Expand All @@ -71,7 +70,7 @@ import org.knora.webapi.store.triplestore.errors.TriplestoreTimeoutException
import org.knora.webapi.store.triplestore.errors.TriplestoreUnsupportedFeatureException
import org.knora.webapi.util.ZScopedJavaIoStreams.byteArrayOutputStream
import org.knora.webapi.util.ZScopedJavaIoStreams.fileInputStream
import org.knora.webapi.util.ZScopedJavaIoStreams.outputStreamPipedToInputStream
import org.knora.webapi.util.ZScopedJavaIoStreams.fileOutputStream

final case class TriplestoreServiceInMemory(datasetRef: Ref[Dataset], implicit val sf: StringFormatter)
extends TriplestoreService {
Expand Down Expand Up @@ -231,35 +230,17 @@ final case class TriplestoreServiceInMemory(datasetRef: Ref[Dataset], implicit v
graphIri: IRI,
outputFile: Path,
outputFormat: QuadFormat
): Task[FileWrittenResponse] = {
val rdfFormatUtil: RdfFormatUtil = RdfFeatureFactory.getRdfFormatUtil()
ZIO.scoped {
for {
inOut <- outputStreamPipedToInputStream()
ds <- datasetRef.get
_ <- ZIO.attemptBlocking {
val readFromModel = new Thread {
override def run(): Unit = {
ds.begin(ReadWrite.READ)
try {
ds.getNamedModel(graphIri).write(inOut._2, "TURTLE")
} finally {
ds.end()
}
}
}
val writeToFile = new Thread {
override def run(): Unit =
rdfFormatUtil.turtleToQuadsFile(RdfInputStreamSource(inOut._1), graphIri, outputFile, outputFormat)
}
readFromModel.start()
writeToFile.start()
val timeout = 10.minutes
readFromModel.join(timeout.toMillis)
writeToFile.join(timeout.toMillis)
}
} yield FileWrittenResponse()
}
): Task[FileWrittenResponse] = ZIO.scoped {
for {
fos <- fileOutputStream(outputFile)
ds <- datasetRef.get
lang = JenaFormatUtil.rdfFormatToJenaParsingLang(outputFormat)
_ <- ZIO.attemptBlocking {
ds.begin(ReadWrite.READ)
try { ds.getNamedModel(graphIri).write(fos, lang.getName) }
finally { ds.end() }
}
} yield FileWrittenResponse()
}

override def sparqlHttpGraphData(graphIri: IRI): Task[NamedGraphDataResponse] = ZIO.scoped {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,4 @@ object ZScopedJavaIoStreams {
def acquire = ZIO.attempt(Files.newOutputStream(path))
ZIO.acquireRelease(acquire)(release).flatMap(os => bufferedOutputStream(os))
}

/**
* Creates a [[PipedInputStream]] so that it is connected to the piped output stream `out`.
* @param out The piped output stream to connect to.
* @return The managed piped input stream.
*/
def pipedInputStream(out: PipedOutputStream): ZIO[Any with Scope, Throwable, PipedInputStream] = {
def acquire = ZIO.attempt(new PipedInputStream(out))
ZIO.acquireRelease(acquire)(release)
}

/**
* Creates a piped output stream that is not yet connected to a
* piped input stream. It must be connected to a piped input stream,
* either by the receiver or the sender, before being used.
*/
def pipedOutStream(): ZIO[Any with Scope, Throwable, PipedOutputStream] = {
def acquire = ZIO.attempt(new PipedOutputStream())
ZIO.acquireRelease(acquire)(release)
}

/**
* Creates a piped output stream that is connected to a piped input stream.
*/
def outputStreamPipedToInputStream(): ZIO[Any with Scope, Throwable, (PipedInputStream, PipedOutputStream)] =
pipedOutStream().flatMap(out => pipedInputStream(out).map((_, out)))
}

0 comments on commit 423b352

Please sign in to comment.