Skip to content

Commit

Permalink
CSV: stream may end with delimiter (#1711)
Browse files Browse the repository at this point in the history
CSV: stream may end with delimiter
  • Loading branch information
2m committed May 22, 2019
2 parents f1ec152 + efa7999 commit 459a684
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 16 deletions.
32 changes: 16 additions & 16 deletions csv/src/main/scala/akka/stream/alpakka/csv/impl/CsvParser.scala
Expand Up @@ -114,19 +114,19 @@ import scala.collection.mutable
current = next.iterator
}

def poll(requireLineEnd: Boolean): Option[List[ByteString]] =
if (buffer.nonEmpty) {
val line = parseLine(requireLineEnd)
if (line.nonEmpty) {
currentLineNo += 1
if (state == LineEnd) {
state = LineStart
}
resetLine()
columns.clear()
def poll(requireLineEnd: Boolean): Option[List[ByteString]] = {
if (buffer.nonEmpty) parseLine()
val line = maybeExtractLine(requireLineEnd)
if (line.nonEmpty) {
currentLineNo += 1
if (state == LineEnd || !requireLineEnd) {
state = LineStart
}
line
} else None
resetLine()
columns.clear()
}
line
}

private[this] def advance(n: Int = 1): Unit = {
pos += n
Expand Down Expand Up @@ -198,14 +198,12 @@ import scala.collection.mutable
}
}

protected def parseLine(requireLineEnd: Boolean): Option[List[ByteString]] = {
private[this] def parseLine(): Unit = {
if (firstData) {
checkForByteOrderMark()
firstData = false
}

churn()
maybeExtractLine(requireLineEnd)
}

private[this] def churn(): Unit = {
Expand Down Expand Up @@ -446,8 +444,10 @@ import scala.collection.mutable
Some(columns.toList)
case WithinFieldEscaped | WithinQuotedFieldEscaped =>
noCharEscaped()
case _ =>
case _ if columns.nonEmpty =>
Some(columns.toList)
case _ =>
None
}
}

Expand Down
13 changes: 13 additions & 0 deletions csv/src/test/scala/akka/stream/alpakka/csv/CsvParserSpec.scala
Expand Up @@ -238,6 +238,19 @@ class CsvParserSpec extends WordSpec with Matchers with OptionValues {
exception.getMessage should be("unclosed quote at end of input 1:6, no matching quote found")
}

"accept delimiter as last input" in {
val parser = new CsvParser(delimiter = ',', quoteChar = '"', escapeChar = '\\', maximumLineLength)
parser.offer(ByteString("A,B\nA,"))
parser.poll(requireLineEnd = false).value.map(_.utf8String) should be(List("A", "B"))
parser.poll(requireLineEnd = false).value shouldBe List(ByteString("A"), ByteString.empty)
}

"accept delimiter as last input on first line" in {
val parser = new CsvParser(delimiter = ',', quoteChar = '"', escapeChar = '\\', maximumLineLength)
parser.offer(ByteString("A,"))
parser.poll(requireLineEnd = false).value shouldBe List(ByteString("A"), ByteString.empty)
}

"detect line ending correctly if input is split between CR & LF" in {
val parser = new CsvParser(delimiter = ',', quoteChar = '"', escapeChar = '\\', maximumLineLength)
parser.offer(ByteString("A,D\r"))
Expand Down
14 changes: 14 additions & 0 deletions csv/src/test/scala/docs/scaladsl/CsvParsingSpec.scala
Expand Up @@ -135,6 +135,20 @@ class CsvParsingSpec extends CsvSpec {
sink.expectComplete()
}

"read all lines without final line end and last column empty" in {
val result = Source
.single(ByteString("""eins,zwei,drei
|uno,""".stripMargin))
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String))
.runWith(Sink.seq)
.futureValue

result should have size 2
result.head should be(List("eins", "zwei", "drei"))
result(1) should be(List("uno", ""))
}

"parse Apple Numbers exported file" in assertAllStagesStopped {
val fut =
FileIO
Expand Down

0 comments on commit 459a684

Please sign in to comment.