Skip to content

Commit

Permalink
Changed flows that alter lengths to use the group length warning flow…
Browse files Browse the repository at this point in the history
… for better user feedback
  • Loading branch information
karl-exini committed Oct 12, 2020
1 parent 936f98a commit ff7dfa8
Showing 1 changed file with 125 additions and 118 deletions.
243 changes: 125 additions & 118 deletions src/main/scala/com/exini/dicom/streams/DicomFlows.scala
Expand Up @@ -520,48 +520,52 @@ object DicomFlows {
Seq(TagInsertion(TagPath.fromTag(Tag.SpecificCharacterSet), _ => ByteString("ISO_IR 192")))
)
)
.statefulMapConcat { () =>
var characterSets: CharacterSets = defaultCharacterSet
var currentHeader: Option[HeaderPart] = None
var currentValue = ByteString.empty
.via(new DeferToPartFlow[DicomPart] with GroupLengthWarnings[DicomPart] {
override def createLogic(attr: Attributes): GraphStageLogic =
new DeferToPartLogic with GroupLengthWarningsLogic {
var characterSets: CharacterSets = defaultCharacterSet
var currentHeader: Option[HeaderPart] = None
var currentValue: ByteString = ByteString.empty

{
case attr: ElementsPart if attr.label == "toutf8" =>
characterSets = attr
.elements(Tag.SpecificCharacterSet)
.map {
case e: ValueElement => CharacterSets(e)
case _ => characterSets
override def onPart(part: DicomPart): List[DicomPart] =
part match {
case attr: ElementsPart if attr.label == "toutf8" =>
characterSets = attr
.elements(Tag.SpecificCharacterSet)
.map {
case e: ValueElement => CharacterSets(e)
case _ => characterSets
}
.getOrElse(characterSets)
Nil
case header: HeaderPart =>
if (header.length > 0 && CharacterSets.isVrAffectedBySpecificCharacterSet(header.vr)) {
currentHeader = Some(header)
currentValue = ByteString.empty
Nil
} else {
currentHeader = None
header :: Nil
}
case value: ValueChunk if currentHeader.isDefined =>
currentValue = currentValue ++ value.bytes
if (value.last) {
val newValue = currentHeader
.map(h => characterSets.decode(h.vr, currentValue).getBytes(utf8Charset))
.map(ByteString.apply)
val newLength = newValue.map(_.length)
val newElement = for {
h <- currentHeader
v <- newValue
l <- newLength
} yield h.withUpdatedLength(l) :: ValueChunk(h.bigEndian, v, last = true) :: Nil
newElement.getOrElse(Nil)
} else Nil
case p: DicomPart =>
p :: Nil
}
.getOrElse(characterSets)
Nil
case header: HeaderPart =>
if (header.length > 0 && CharacterSets.isVrAffectedBySpecificCharacterSet(header.vr)) {
currentHeader = Some(header)
currentValue = ByteString.empty
Nil
} else {
currentHeader = None
header :: Nil
}
case value: ValueChunk if currentHeader.isDefined =>
currentValue = currentValue ++ value.bytes
if (value.last) {
val newValue = currentHeader
.map(h => characterSets.decode(h.vr, currentValue).getBytes(utf8Charset))
.map(ByteString.apply)
val newLength = newValue.map(_.length)
val newElement = for {
h <- currentHeader
v <- newValue
l <- newLength
} yield h.withUpdatedLength(l) :: ValueChunk(h.bigEndian, v, last = true) :: Nil
newElement.getOrElse(Nil)
} else Nil
case p: DicomPart =>
p :: Nil
}
}
}
})

/**
* Ensure that the data has transfer syntax explicit VR little endian. Changes the TransferSyntaxUID, if present,
Expand All @@ -581,92 +585,95 @@ object DicomFlows {
_ => padToEvenLength(ByteString(UID.ExplicitVRLittleEndian), VR.UI)
)
)
).statefulMapConcat {
).via(new DeferToPartFlow[DicomPart] with GroupLengthWarnings[DicomPart] {

case class SwapResult(bytes: ByteString, carry: ByteString)

def swap(k: Int, b: ByteString): SwapResult =
SwapResult(b.grouped(k).map(_.reverse).reduce(_ ++ _), b.takeRight(b.length % k))

() =>
var currentVr: Option[VR] = None
var carryBytes = ByteString.empty
override def createLogic(attr: Attributes): GraphStageLogic =
new DeferToPartLogic with GroupLengthWarningsLogic {
var currentVr: Option[VR] = None
var carryBytes: ByteString = ByteString.empty

def updatedValue(swapResult: SwapResult, last: Boolean): ValueChunk = {
carryBytes = swapResult.carry
if (last && carryBytes.nonEmpty)
throw new DicomStreamException("Dicom value length does not match length specified in header")
ValueChunk(bigEndian = false, swapResult.bytes, last = last)
}

def updatedValue(swapResult: SwapResult, last: Boolean): ValueChunk = {
carryBytes = swapResult.carry
if (last && carryBytes.nonEmpty)
throw new DicomStreamException("Dicom value length does not match length specified in header")
ValueChunk(bigEndian = false, swapResult.bytes, last = last)
}
override def onPart(part: DicomPart): List[DicomPart] =
part match {
case h: HeaderPart if h.bigEndian || !h.explicitVR =>
if (h.bigEndian) {
carryBytes = ByteString.empty
currentVr = Some(h.vr)
} else currentVr = None
HeaderPart(h.tag, h.vr, h.length, h.isFmi) :: Nil

case v: ValueChunk if currentVr.isDefined && v.bigEndian =>
currentVr match {
case Some(vr) if vr == VR.US || vr == VR.SS || vr == VR.OW || vr == VR.AT => // 2 byte
updatedValue(swap(2, carryBytes ++ v.bytes), v.last) :: Nil
case Some(vr) if vr == VR.OF || vr == VR.UL || vr == VR.SL || vr == VR.FL => // 4 bytes
updatedValue(swap(4, carryBytes ++ v.bytes), v.last) :: Nil
case Some(vr) if vr == VR.OD || vr == VR.FD => // 8 bytes
updatedValue(swap(8, carryBytes ++ v.bytes), v.last) :: Nil
case _ => v :: Nil
}

{
case h: HeaderPart if h.bigEndian || !h.explicitVR =>
if (h.bigEndian) {
carryBytes = ByteString.empty
currentVr = Some(h.vr)
} else currentVr = None
HeaderPart(h.tag, h.vr, h.length, h.isFmi) :: Nil

case v: ValueChunk if currentVr.isDefined && v.bigEndian =>
currentVr match {
case Some(vr) if vr == VR.US || vr == VR.SS || vr == VR.OW || vr == VR.AT => // 2 byte
updatedValue(swap(2, carryBytes ++ v.bytes), v.last) :: Nil
case Some(vr) if vr == VR.OF || vr == VR.UL || vr == VR.SL || vr == VR.FL => // 4 bytes
updatedValue(swap(4, carryBytes ++ v.bytes), v.last) :: Nil
case Some(vr) if vr == VR.OD || vr == VR.FD => // 8 bytes
updatedValue(swap(8, carryBytes ++ v.bytes), v.last) :: Nil
case _ => v :: Nil
case p: DicomPart if !p.bigEndian => p :: Nil

case s: SequencePart =>
SequencePart(
s.tag,
s.length,
bigEndian = false,
explicitVR = true,
tagToBytesLE(s.tag) ++ ByteString('S', 'Q', 0, 0) ++ s.bytes.takeRight(4).reverse
) :: Nil

case _: SequenceDelimitationPart =>
SequenceDelimitationPart(
bigEndian = false,
tagToBytesLE(Tag.SequenceDelimitationItem) ++ ByteString(0, 0, 0, 0)
) :: Nil

case i: ItemPart =>
ItemPart(
i.index,
i.length,
bigEndian = false,
tagToBytesLE(Tag.Item) ++ i.bytes.takeRight(4).reverse
) :: Nil

case i: ItemDelimitationPart =>
ItemDelimitationPart(
i.index,
bigEndian = false,
tagToBytesLE(Tag.ItemDelimitationItem) ++ ByteString(0, 0, 0, 0)
) :: Nil

case f: FragmentsPart =>
if (f.bigEndian) {
carryBytes = ByteString.empty
currentVr = Some(f.vr)
} else currentVr = None
FragmentsPart(
f.tag,
f.length,
f.vr,
bigEndian = false,
explicitVR = true,
tagToBytesLE(f.tag) ++ f.bytes.drop(4).take(4) ++ f.bytes.takeRight(4).reverse
) :: Nil

case p => p :: Nil
}

case p: DicomPart if !p.bigEndian => p :: Nil

case s: SequencePart =>
SequencePart(
s.tag,
s.length,
bigEndian = false,
explicitVR = true,
tagToBytesLE(s.tag) ++ ByteString('S', 'Q', 0, 0) ++ s.bytes.takeRight(4).reverse
) :: Nil

case _: SequenceDelimitationPart =>
SequenceDelimitationPart(
bigEndian = false,
tagToBytesLE(Tag.SequenceDelimitationItem) ++ ByteString(0, 0, 0, 0)
) :: Nil

case i: ItemPart =>
ItemPart(
i.index,
i.length,
bigEndian = false,
tagToBytesLE(Tag.Item) ++ i.bytes.takeRight(4).reverse
) :: Nil

case i: ItemDelimitationPart =>
ItemDelimitationPart(
i.index,
bigEndian = false,
tagToBytesLE(Tag.ItemDelimitationItem) ++ ByteString(0, 0, 0, 0)
) :: Nil

case f: FragmentsPart =>
if (f.bigEndian) {
carryBytes = ByteString.empty
currentVr = Some(f.vr)
} else currentVr = None
FragmentsPart(
f.tag,
f.length,
f.vr,
bigEndian = false,
explicitVR = true,
tagToBytesLE(f.tag) ++ f.bytes.drop(4).take(4) ++ f.bytes.takeRight(4).reverse
) :: Nil

case p => p :: Nil
}
}
})
.via(fmiGroupLengthFlow)

/**
Expand All @@ -680,9 +687,9 @@ object DicomFlows {
*/
val toEvenValueLengthFlow: PartFlow =
partFlow
.via(new DeferToPartFlow[DicomPart] with InFragments[DicomPart] {
.via(new DeferToPartFlow[DicomPart] with InFragments[DicomPart] with GroupLengthWarnings[DicomPart] {
override def createLogic(attr: Attributes): GraphStageLogic =
new DeferToPartLogic with InFragmentsLogic {
new DeferToPartLogic with InFragmentsLogic with GroupLengthWarningsLogic {
var isOdd: Boolean = false
var pad: ByteString = ByteString(0)

Expand Down

0 comments on commit ff7dfa8

Please sign in to comment.