Skip to content

Commit

Permalink
Merge pull request #8 from exini/bugfix/stop-tag
Browse files Browse the repository at this point in the history
Moved stop tag handling to separate flow where it can be handled appr…
  • Loading branch information
karl-exini committed Jun 14, 2019
2 parents 33e0de4 + 422e174 commit 4709600
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 61 deletions.
30 changes: 23 additions & 7 deletions src/main/scala/com/exini/dicom/streams/DicomFlows.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ object DicomFlows {
a
}

/**
* Flow for stopping processing early once a tag has been reached. Attributes in sequences are ignored.
*
* @param tag tag number referring to the root dataset at which the processing stop (exclusive)
* @return the associated flow
*/
def stopTagFlow(tag: Int): PartFlow =
DicomFlowFactory.create(new IdentityFlow with InSequence[DicomPart] {
override def onHeader(part: HeaderPart): List[DicomPart] = {
if (!inSequence && part.tag >= tag)
DicomEndMarker :: Nil
else
super.onHeader(part)
}
}).takeWhile(_ != DicomEndMarker)

/**
* Filter a stream of dicom parts such that all elements except those with tags in the white list are discarded.
*
Expand Down Expand Up @@ -351,13 +367,13 @@ object DicomFlows {
Nil

case p if !hasEmitted && p.bytes.nonEmpty =>
hasEmitted = true
p match {
case preamble: PreamblePart =>
preamble :: fmi
case _ =>
fmi ::: p :: Nil
}
hasEmitted = true
p match {
case preamble: PreamblePart =>
preamble :: fmi
case _ =>
fmi ::: p :: Nil
}

case p =>
p :: Nil
Expand Down
15 changes: 6 additions & 9 deletions src/main/scala/com/exini/dicom/streams/ParseFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.exini.dicom.data.DicomParts._
import com.exini.dicom.data.VR.VR
import com.exini.dicom.data._

class ParseFlow private(chunkSize: Int, stopTag: Option[Long]) extends ByteStringParser[DicomPart] {
class ParseFlow private(chunkSize: Int) extends ByteStringParser[DicomPart] {

import ByteStringParser._

Expand Down Expand Up @@ -173,9 +173,7 @@ class ParseFlow private(chunkSize: Int, stopTag: Option[Long]) extends ByteStrin
case class InDatasetHeader(state: DatasetHeaderState) extends DicomParseStep {
private def readDatasetHeader(reader: ByteReader, state: DatasetHeaderState): Option[DicomPart] = {
val (tag, vr, headerLength, valueLength) = readHeader(reader, state)
if (stopTag.isDefined && intToUnsignedLong(tag) >= stopTag.get && intToUnsignedLong(tag) < intToUnsignedLong(Tag.Item))
None
else if (vr != null) {
if (vr != null) {
val bytes = reader.take(headerLength)
if (vr == VR.SQ || vr == VR.UN && valueLength == indeterminateLength)
Some(SequencePart(tag, valueLength, state.bigEndian, state.explicitVR, bytes))
Expand Down Expand Up @@ -310,16 +308,15 @@ object ParseFlow {
* items, sequences and fragments.
*
* @param chunkSize the maximum size of a DICOM element data chunk
* @param stopTag optional stop tag (exclusive) after which reading of incoming data bytes is stopped
* @param inflate indicates whether deflated DICOM data should be deflated and parsed or passed on as deflated data chunks.
*/
def apply(chunkSize: Int = 8192, stopTag: Option[Long] = None, inflate: Boolean = true): Flow[ByteString, DicomPart, NotUsed] =
def apply(chunkSize: Int = 8192, inflate: Boolean = true): Flow[ByteString, DicomPart, NotUsed] =
if (inflate)
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._

val parser1 = builder.add(new ParseFlow(chunkSize, stopTag))
val parser2 = new ParseFlow(chunkSize, stopTag)
val parser1 = builder.add(new ParseFlow(chunkSize))
val parser2 = new ParseFlow(chunkSize)

val decider = builder.add(Flow[DicomPart]
.statefulMapConcat(() => {
Expand All @@ -346,7 +343,7 @@ object ParseFlow {
FlowShape(parser1.in, merge.out)
})
else
Flow[ByteString].via(new ParseFlow(chunkSize, stopTag))
Flow[ByteString].via(new ParseFlow(chunkSize))

val parseFlow = apply()

Expand Down
49 changes: 48 additions & 1 deletion src/test/scala/com/exini/dicom/streams/DicomFlowsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import akka.stream.scaladsl.{FileIO, Source}
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import com.exini.dicom.data.DicomParts.{DicomPart, MetaPart}
import com.exini.dicom.data.TestData._
import com.exini.dicom.data._
import com.exini.dicom.streams.DicomFlows._
import com.exini.dicom.streams.ModifyFlow._
import com.exini.dicom.streams.ParseFlow._
import com.exini.dicom.streams.TestUtils._
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

import scala.concurrent.ExecutionContextExecutor

Expand Down Expand Up @@ -45,6 +45,53 @@ class DicomFlowsTest extends TestKit(ActorSystem("DicomFlowsSpec")) with FlatSpe
.expectDicomComplete()
}

"The stop tag flow" should "stop reading data when a stop tag is reached" in {
val bytes = studyDate() ++ patientNameJohnDoe()

val source = Source.single(bytes)
.via(parseFlow)
.via(stopTagFlow(Tag.PatientName))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate)
.expectValueChunk()
.expectDicomComplete()
}

it should "stop reading data when a tag number is higher than the stop tag" in {
val bytes = studyDate() ++ patientNameJohnDoe()

val source = Source.single(bytes)
.via(parseFlow)
.via(stopTagFlow(Tag.StudyDate + 1))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate)
.expectValueChunk()
.expectDicomComplete()
}

it should "apply stop tag correctly also when preceded by sequence and ignore tags in sequences" in {
val bytes = studyDate() ++ sequence(Tag.DerivationCodeSequence) ++ item() ++ pixelData(10) ++ itemDelimitation() ++ sequenceDelimitation() ++ patientNameJohnDoe() ++ pixelData(100)

val source = Source.single(bytes)
.via(parseFlow)
.via(stopTagFlow(Tag.PatientName + 1))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate)
.expectValueChunk()
.expectSequence(Tag.DerivationCodeSequence)
.expectItem(1)
.expectHeader(Tag.PixelData)
.expectValueChunk()
.expectItemDelimitation()
.expectSequenceDelimitation()
.expectHeader(Tag.PatientName)
.expectValueChunk()
.expectDicomComplete()
}

"The DICOM group length discard filter" should "discard group length elements except 0002,0000" in {
val bytes = preamble ++ fmiGroupLength(transferSyntaxUID()) ++ transferSyntaxUID() ++ groupLength(8, studyDate().length) ++ studyDate()

Expand Down
44 changes: 0 additions & 44 deletions src/test/scala/com/exini/dicom/streams/ParseFlowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -354,50 +354,6 @@ class ParseFlowTest extends TestKit(ActorSystem("ParseFlowSpec")) with FlatSpecL
.expectDicomComplete()
}

it should "stop reading data when a stop tag is reached" in {
val bytes = studyDate() ++ patientNameJohnDoe()

val source = Source.single(bytes)
.via(ParseFlow(stopTag = Some(Tag.PatientName)))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate)
.expectValueChunk()
.expectDicomComplete()
}

it should "stop reading data when a tag number is higher than the stop tag" in {
val bytes = studyDate() ++ patientNameJohnDoe()

val source = Source.single(bytes)
.via(ParseFlow(stopTag = Some(Tag.StudyDate + 1)))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate)
.expectValueChunk()
.expectDicomComplete()
}

it should "apply stop tag correctly also when preceded by sequence" in {
val bytes = studyDate() ++ sequence(Tag.DerivationCodeSequence) ++ item() ++ studyDate() ++ itemDelimitation() ++ sequenceDelimitation() ++ patientNameJohnDoe() ++ pixelData(100)

val source = Source.single(bytes)
.via(ParseFlow(stopTag = Some(Tag.PatientName + 1)))

source.runWith(TestSink.probe[DicomPart])
.expectHeader(Tag.StudyDate)
.expectValueChunk()
.expectSequence(Tag.DerivationCodeSequence)
.expectItem(1)
.expectHeader(Tag.StudyDate)
.expectValueChunk()
.expectItemDelimitation()
.expectSequenceDelimitation()
.expectHeader(Tag.PatientName)
.expectValueChunk()
.expectDicomComplete()
}

it should "chunk value data according to max chunk size" in {
val bytes = preamble ++ fmiGroupLength(transferSyntaxUID()) ++ transferSyntaxUID() ++ patientNameJohnDoe()

Expand Down

0 comments on commit 4709600

Please sign in to comment.