From ff7dfa87d399b3f001cd11f7d49ef5de5b105638 Mon Sep 17 00:00:00 2001 From: "karl.sjostrand@exini.com" Date: Mon, 12 Oct 2020 16:12:24 -0400 Subject: [PATCH] Changed flows that alter lengths to use the group length warning flow for better user feedback --- .../com/exini/dicom/streams/DicomFlows.scala | 243 +++++++++--------- 1 file changed, 125 insertions(+), 118 deletions(-) diff --git a/src/main/scala/com/exini/dicom/streams/DicomFlows.scala b/src/main/scala/com/exini/dicom/streams/DicomFlows.scala index 460544c..c72ef82 100644 --- a/src/main/scala/com/exini/dicom/streams/DicomFlows.scala +++ b/src/main/scala/com/exini/dicom/streams/DicomFlows.scala @@ -520,48 +520,52 @@ object DicomFlows { Seq(TagInsertion(TagPath.fromTag(Tag.SpecificCharacterSet), _ => ByteString("ISO_IR 192"))) ) ) - .statefulMapConcat { () => - var characterSets: CharacterSets = defaultCharacterSet - var currentHeader: Option[HeaderPart] = None - var currentValue = ByteString.empty + .via(new DeferToPartFlow[DicomPart] with GroupLengthWarnings[DicomPart] { + override def createLogic(attr: Attributes): GraphStageLogic = + new DeferToPartLogic with GroupLengthWarningsLogic { + var characterSets: CharacterSets = defaultCharacterSet + var currentHeader: Option[HeaderPart] = None + var currentValue: ByteString = ByteString.empty - { - case attr: ElementsPart if attr.label == "toutf8" => - characterSets = attr - .elements(Tag.SpecificCharacterSet) - .map { - case e: ValueElement => CharacterSets(e) - case _ => characterSets + override def onPart(part: DicomPart): List[DicomPart] = + part match { + case attr: ElementsPart if attr.label == "toutf8" => + characterSets = attr + .elements(Tag.SpecificCharacterSet) + .map { + case e: ValueElement => CharacterSets(e) + case _ => characterSets + } + .getOrElse(characterSets) + Nil + case header: HeaderPart => + if (header.length > 0 && CharacterSets.isVrAffectedBySpecificCharacterSet(header.vr)) { + currentHeader = Some(header) + currentValue = ByteString.empty + Nil + } else { + currentHeader = None + header :: Nil + } + case value: ValueChunk if currentHeader.isDefined => + currentValue = currentValue ++ value.bytes + if (value.last) { + val newValue = currentHeader + .map(h => characterSets.decode(h.vr, currentValue).getBytes(utf8Charset)) + .map(ByteString.apply) + val newLength = newValue.map(_.length) + val newElement = for { + h <- currentHeader + v <- newValue + l <- newLength + } yield h.withUpdatedLength(l) :: ValueChunk(h.bigEndian, v, last = true) :: Nil + newElement.getOrElse(Nil) + } else Nil + case p: DicomPart => + p :: Nil } - .getOrElse(characterSets) - Nil - case header: HeaderPart => - if (header.length > 0 && CharacterSets.isVrAffectedBySpecificCharacterSet(header.vr)) { - currentHeader = Some(header) - currentValue = ByteString.empty - Nil - } else { - currentHeader = None - header :: Nil - } - case value: ValueChunk if currentHeader.isDefined => - currentValue = currentValue ++ value.bytes - if (value.last) { - val newValue = currentHeader - .map(h => characterSets.decode(h.vr, currentValue).getBytes(utf8Charset)) - .map(ByteString.apply) - val newLength = newValue.map(_.length) - val newElement = for { - h <- currentHeader - v <- newValue - l <- newLength - } yield h.withUpdatedLength(l) :: ValueChunk(h.bigEndian, v, last = true) :: Nil - newElement.getOrElse(Nil) - } else Nil - case p: DicomPart => - p :: Nil - } - } + } + }) /** * Ensure that the data has transfer syntax explicit VR little endian. Changes the TransferSyntaxUID, if present, @@ -581,92 +585,95 @@ object DicomFlows { _ => padToEvenLength(ByteString(UID.ExplicitVRLittleEndian), VR.UI) ) ) - ).statefulMapConcat { + ).via(new DeferToPartFlow[DicomPart] with GroupLengthWarnings[DicomPart] { case class SwapResult(bytes: ByteString, carry: ByteString) def swap(k: Int, b: ByteString): SwapResult = SwapResult(b.grouped(k).map(_.reverse).reduce(_ ++ _), b.takeRight(b.length % k)) - () => - var currentVr: Option[VR] = None - var carryBytes = ByteString.empty + override def createLogic(attr: Attributes): GraphStageLogic = + new DeferToPartLogic with GroupLengthWarningsLogic { + var currentVr: Option[VR] = None + var carryBytes: ByteString = ByteString.empty + + def updatedValue(swapResult: SwapResult, last: Boolean): ValueChunk = { + carryBytes = swapResult.carry + if (last && carryBytes.nonEmpty) + throw new DicomStreamException("Dicom value length does not match length specified in header") + ValueChunk(bigEndian = false, swapResult.bytes, last = last) + } - def updatedValue(swapResult: SwapResult, last: Boolean): ValueChunk = { - carryBytes = swapResult.carry - if (last && carryBytes.nonEmpty) - throw new DicomStreamException("Dicom value length does not match length specified in header") - ValueChunk(bigEndian = false, swapResult.bytes, last = last) - } + override def onPart(part: DicomPart): List[DicomPart] = + part match { + case h: HeaderPart if h.bigEndian || !h.explicitVR => + if (h.bigEndian) { + carryBytes = ByteString.empty + currentVr = Some(h.vr) + } else currentVr = None + HeaderPart(h.tag, h.vr, h.length, h.isFmi) :: Nil + + case v: ValueChunk if currentVr.isDefined && v.bigEndian => + currentVr match { + case Some(vr) if vr == VR.US || vr == VR.SS || vr == VR.OW || vr == VR.AT => // 2 byte + updatedValue(swap(2, carryBytes ++ v.bytes), v.last) :: Nil + case Some(vr) if vr == VR.OF || vr == VR.UL || vr == VR.SL || vr == VR.FL => // 4 bytes + updatedValue(swap(4, carryBytes ++ v.bytes), v.last) :: Nil + case Some(vr) if vr == VR.OD || vr == VR.FD => // 8 bytes + updatedValue(swap(8, carryBytes ++ v.bytes), v.last) :: Nil + case _ => v :: Nil + } - { - case h: HeaderPart if h.bigEndian || !h.explicitVR => - if (h.bigEndian) { - carryBytes = ByteString.empty - currentVr = Some(h.vr) - } else currentVr = None - HeaderPart(h.tag, h.vr, h.length, h.isFmi) :: Nil - - case v: ValueChunk if currentVr.isDefined && v.bigEndian => - currentVr match { - case Some(vr) if vr == VR.US || vr == VR.SS || vr == VR.OW || vr == VR.AT => // 2 byte - updatedValue(swap(2, carryBytes ++ v.bytes), v.last) :: Nil - case Some(vr) if vr == VR.OF || vr == VR.UL || vr == VR.SL || vr == VR.FL => // 4 bytes - updatedValue(swap(4, carryBytes ++ v.bytes), v.last) :: Nil - case Some(vr) if vr == VR.OD || vr == VR.FD => // 8 bytes - updatedValue(swap(8, carryBytes ++ v.bytes), v.last) :: Nil - case _ => v :: Nil + case p: DicomPart if !p.bigEndian => p :: Nil + + case s: SequencePart => + SequencePart( + s.tag, + s.length, + bigEndian = false, + explicitVR = true, + tagToBytesLE(s.tag) ++ ByteString('S', 'Q', 0, 0) ++ s.bytes.takeRight(4).reverse + ) :: Nil + + case _: SequenceDelimitationPart => + SequenceDelimitationPart( + bigEndian = false, + tagToBytesLE(Tag.SequenceDelimitationItem) ++ ByteString(0, 0, 0, 0) + ) :: Nil + + case i: ItemPart => + ItemPart( + i.index, + i.length, + bigEndian = false, + tagToBytesLE(Tag.Item) ++ i.bytes.takeRight(4).reverse + ) :: Nil + + case i: ItemDelimitationPart => + ItemDelimitationPart( + i.index, + bigEndian = false, + tagToBytesLE(Tag.ItemDelimitationItem) ++ ByteString(0, 0, 0, 0) + ) :: Nil + + case f: FragmentsPart => + if (f.bigEndian) { + carryBytes = ByteString.empty + currentVr = Some(f.vr) + } else currentVr = None + FragmentsPart( + f.tag, + f.length, + f.vr, + bigEndian = false, + explicitVR = true, + tagToBytesLE(f.tag) ++ f.bytes.drop(4).take(4) ++ f.bytes.takeRight(4).reverse + ) :: Nil + + case p => p :: Nil } - - case p: DicomPart if !p.bigEndian => p :: Nil - - case s: SequencePart => - SequencePart( - s.tag, - s.length, - bigEndian = false, - explicitVR = true, - tagToBytesLE(s.tag) ++ ByteString('S', 'Q', 0, 0) ++ s.bytes.takeRight(4).reverse - ) :: Nil - - case _: SequenceDelimitationPart => - SequenceDelimitationPart( - bigEndian = false, - tagToBytesLE(Tag.SequenceDelimitationItem) ++ ByteString(0, 0, 0, 0) - ) :: Nil - - case i: ItemPart => - ItemPart( - i.index, - i.length, - bigEndian = false, - tagToBytesLE(Tag.Item) ++ i.bytes.takeRight(4).reverse - ) :: Nil - - case i: ItemDelimitationPart => - ItemDelimitationPart( - i.index, - bigEndian = false, - tagToBytesLE(Tag.ItemDelimitationItem) ++ ByteString(0, 0, 0, 0) - ) :: Nil - - case f: FragmentsPart => - if (f.bigEndian) { - carryBytes = ByteString.empty - currentVr = Some(f.vr) - } else currentVr = None - FragmentsPart( - f.tag, - f.length, - f.vr, - bigEndian = false, - explicitVR = true, - tagToBytesLE(f.tag) ++ f.bytes.drop(4).take(4) ++ f.bytes.takeRight(4).reverse - ) :: Nil - - case p => p :: Nil } - } + }) .via(fmiGroupLengthFlow) /** @@ -680,9 +687,9 @@ object DicomFlows { */ val toEvenValueLengthFlow: PartFlow = partFlow - .via(new DeferToPartFlow[DicomPart] with InFragments[DicomPart] { + .via(new DeferToPartFlow[DicomPart] with InFragments[DicomPart] with GroupLengthWarnings[DicomPart] { override def createLogic(attr: Attributes): GraphStageLogic = - new DeferToPartLogic with InFragmentsLogic { + new DeferToPartLogic with InFragmentsLogic with GroupLengthWarningsLogic { var isOdd: Boolean = false var pad: ByteString = ByteString(0)