Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CsvReader: provide correct logical and physical row numbers. #1837

Merged
merged 1 commit into from
Jan 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;

/**
* This class is used to traverse over text from a Reader, understanding both field and line delimiters, as well as the
Expand Down Expand Up @@ -59,10 +58,17 @@ final class CellGrabber {
* A side buffer we have to use for edge cases. Normally we try to return a {@link ByteSlice} which shares our
* buffer[] array. But we can't do that when the input cell spans more than one buffer[] chunk, or when the input
* cell does not exactly represent the output. This latter case can happen for example when an escaped quote ("")
* needs to be returned as a single quotation mark ("). So if our input is hello""there, then we can't return
* directly return a slice of the input array, because actually we need hello"there.
* needs to be returned as a single quotation mark ("). So if our input is hello""there, then we can't directly
* return a slice of the input array, because actually we need hello"there (one quotation mark, not two).
*/
private final GrowableByteBuffer spillBuffer;
/**
* Zero-based row number of the input stream. This is for informational purposes only and in particular does NOT
* refer to the number of data rows in the input. (This is because the data rows may be split across multiple lines
* and because there may or may not be headers). We track this number for the benefit of the caller, who may want to
* issue an informative error message when there is a problem.
*/
private int physicalRowNum;

/**
* Constructor.
Expand All @@ -79,6 +85,7 @@ public CellGrabber(final InputStream inputStream, final byte quoteChar, final by
this.offset = 0;
this.startOffset = 0;
this.spillBuffer = new GrowableByteBuffer();
this.physicalRowNum = 0;
}

/**
Expand Down Expand Up @@ -123,13 +130,24 @@ public boolean grabNext(final ByteSlice dest, final MutableBoolean lastInRow) th
*/
private void processQuotedMode(final ByteSlice dest, final MutableBoolean lastInRow) throws CsvReaderException {
startOffset = offset;
boolean prevCharWasCarriageReturn = false;
while (true) {
if (offset == size) {
if (!tryEnsureMore()) {
throw new CsvReaderException("Cell did not have closing quote character");
}
}
final byte ch = buffer[offset++];
// Maintain a correct row number. This is somehat tricky.
if (ch == '\r') {
++physicalRowNum;
prevCharWasCarriageReturn = true;
} else {
if (ch == '\n' && !prevCharWasCarriageReturn) {
++physicalRowNum;
}
prevCharWasCarriageReturn = false;
}
if (ch != quoteChar) {
// Ordinary character. Note: in quoted mode we will gladly eat field and line separators.
continue;
Expand Down Expand Up @@ -227,6 +245,7 @@ private void finishField(final ByteSlice dest, final MutableBoolean lastInRow) t
finish(dest);
++offset;
lastInRow.setValue(true);
++physicalRowNum;
return;
}
if (ch == '\r') {
Expand All @@ -239,6 +258,7 @@ private void finishField(final ByteSlice dest, final MutableBoolean lastInRow) t
}
}
lastInRow.setValue(true);
++physicalRowNum;
return;
}
++offset;
Expand Down Expand Up @@ -303,6 +323,10 @@ private void finish(final ByteSlice dest) {
dest.reset(spillBuffer.data(), 0, spillBuffer.size());
}

public int physicalRowNum() {
return physicalRowNum;
}

/**
* Trim whitespace from the front and back of the slice.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.deephaven.csv.util.CsvReaderException;
import io.deephaven.csv.util.Renderer;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableObject;

import java.io.InputStream;
Expand All @@ -20,7 +19,6 @@
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* A class for reading CSV data. Typical usage is:
Expand Down Expand Up @@ -150,8 +148,7 @@ public Result read(final InputStream stream, final SinkFactory sinkFactory) thro
final CellGrabber grabber = new CellGrabber(stream, quoteChar, fieldDelimiter, ignoreSurroundingSpaces, trim);
// For an "out" parameter
final MutableObject<byte[][]> firstDataRowHolder = new MutableObject<>();
final MutableLong startingRowNumHolder = new MutableLong();
final String[] headersTemp = determineHeadersToUse(grabber, firstDataRowHolder, startingRowNumHolder);
final String[] headersTemp = determineHeadersToUse(grabber, firstDataRowHolder);
final byte[][] firstDataRow = firstDataRowHolder.getValue();
final int numInputCols = headersTemp.length;

Expand Down Expand Up @@ -186,8 +183,7 @@ public Result read(final InputStream stream, final SinkFactory sinkFactory) thro
concurrent ? Executors.newFixedThreadPool(numOutputCols + 1) : Executors.newSingleThreadExecutor();

final Future<Long> numRowsFuture = exec.submit(
() -> ParseInputToDenseStorage.doit(firstDataRow, nullValueLiteral, startingRowNumHolder.longValue(),
grabber, dsws));
() -> ParseInputToDenseStorage.doit(firstDataRow, nullValueLiteral, grabber, dsws));


final ArrayList<Future<Sink<?>>> sinkFutures = new ArrayList<>();
Expand Down Expand Up @@ -254,18 +250,15 @@ private String calcNullValueLiteralToUse(final String columnName, final int oneB
* Determine which headers to use. The result comes from either the first row of the file or the user-specified
* overrides.
*/
private String[] determineHeadersToUse(final CellGrabber grabber, final MutableObject<byte[][]> firstDataRowHolder,
final MutableLong startingRowNumHolder)
private String[] determineHeadersToUse(final CellGrabber grabber, final MutableObject<byte[][]> firstDataRowHolder)
throws CsvReaderException {
String[] headersToUse = null;
long startingRowNum = 0;
if (hasHeaders) {
final byte[][] firstRow = tryReadOneRow(grabber);
if (firstRow == null) {
throw new CsvReaderException("Can't proceed because hasHeaders is set but input file is empty");
}
headersToUse = Arrays.stream(firstRow).map(String::new).toArray(String[]::new);
startingRowNum = 1;
}

// Whether or not the input had headers, maybe override with client-specified headers.
Expand Down Expand Up @@ -296,7 +289,6 @@ private String[] determineHeadersToUse(final CellGrabber grabber, final MutableO
}

firstDataRowHolder.setValue(firstDataRow);
startingRowNumHolder.setValue(startingRowNum);
return headersToUse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public class ParseInputToDenseStorage {
* {@link DenseStorageWriter} is null, then instead of passing data to it, we confirm that the data is the
* empty string and then just drop the data. This is used to handle input files that have a trailing empty
* column on the right.
* @return The number of rows in the input.
* @return The number of data rows in the input (i.e. not including headers or strings split across multiple lines).
*/
public static long doit(final byte[][] optionalFirstDataRow, final String nullValueLiteral,
final long startingRowNum, final CellGrabber grabber,
final CellGrabber grabber,
final DenseStorageWriter[] dsws) throws CsvReaderException {
final ByteSlice slice = new ByteSlice();
final int numCols = dsws.length;
Expand All @@ -45,8 +45,9 @@ public static long doit(final byte[][] optionalFirstDataRow, final String nullVa
final byte[] nullValueBytes = nullValueLiteral.getBytes(StandardCharsets.UTF_8);
final ByteSlice nullSlice = new ByteSlice(nullValueBytes, 0, nullValueBytes.length);

// Zero-based row number.
long rowNum = startingRowNum;
// This is the number of data rows read.
long logicalRowNum = 0;

// There is a case (namely when the file has no headers and the client hasn't specified
// them either) where the CsvReader was forced to read the first row of data from the file
// in order to determine the number of columns. If this happened, optionalFirstDataRow will
Expand All @@ -62,13 +63,19 @@ public static long doit(final byte[][] optionalFirstDataRow, final String nullVa
slice.reset(temp, 0, temp.length);
appendToDenseStorageWriter(dsws[ii], slice);
}
++rowNum;
++logicalRowNum;
}

// Grab the remaining lines and store them.
// The outer while is the "row" iteration.
final MutableBoolean lastInRow = new MutableBoolean();
OUTER: while (true) {
// As we start processing the next data row, grab the row number from the CellGrabber. This number refers
// to the (zero-based) "physical" row number of the file. Now is a logical time to grab that number, because
// a "logical" data row may span multiple "physical" rows, and if we have to report an error to the caller,
// it's clearest if we record the physical row number where the logical row started.
final long physicalRowNum = grabber.physicalRowNum();

// Zero-based column number.
int colNum = 0;

Expand All @@ -88,7 +95,8 @@ public static long doit(final byte[][] optionalFirstDataRow, final String nullVa
if (colNum == numCols) {
if (!lastInRow.booleanValue()) {
throw new CsvReaderException(
String.format("Row %d has too many columns (expected %d)", rowNum + 1, numCols));
String.format("Row %d has too many columns (expected %d)", physicalRowNum + 1,
numCols));
}
break;
}
Expand All @@ -103,18 +111,18 @@ public static long doit(final byte[][] optionalFirstDataRow, final String nullVa
}
} catch (Exception e) {
final String message = String.format("While processing row %d, column %d:",
rowNum + 1, colNum + 1);
physicalRowNum + 1, colNum + 1);
throw new CsvReaderException(message, e);
}
++rowNum;
++logicalRowNum;
}
for (DenseStorageWriter dsw : dsws) {
if (dsw != null) {
dsw.finish();
}
}

return rowNum;
return logicalRowNum;
}

private static void appendToDenseStorageWriter(final DenseStorageWriter dsw, final ByteSlice bs)
Expand Down
104 changes: 87 additions & 17 deletions extensions/csv/src/test/java/io/deephaven/csv/CsvReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,70 @@ private static class Sentinels {
public static final long NULL_TIMESTAMP_AS_LONG = Long.MIN_VALUE;
}

@Test
public void countsAreCorrect() throws CsvReaderException {
final String input = "" +
"Values\n" +
"1\n" +
"\n" +
"3\n";
final CsvReader.Result result = parse(defaultCsvReader(), toInputStream(input));
Assertions.assertThat(result.numCols()).isEqualTo(1);
Assertions.assertThat(result.numRows()).isEqualTo(3);
}

@Test
public void countsAreCorrectNoTrailingNewline() throws CsvReaderException {
final String input = "" +
"Values\n" +
"1\n" +
"\n" +
"3";
final CsvReader.Result result = parse(defaultCsvReader(), toInputStream(input));
Assertions.assertThat(result.numCols()).isEqualTo(1);
Assertions.assertThat(result.numRows()).isEqualTo(3);
}

@Test
public void countsAreCorrectHeaderless() throws CsvReaderException {
final String input = "" +
"1\n" +
"\n" +
"3\n";
final CsvReader.Result result =
parse(defaultCsvReader().setHasHeaders(false).setHeaders("Value"), toInputStream(input));
Assertions.assertThat(result.numCols()).isEqualTo(1);
Assertions.assertThat(result.numRows()).isEqualTo(3);
}

@Test
public void multilineColumnName() throws CsvReaderException {
final String input = "" +
"|Some\nInts|,|Some\rStrings|,|Some\r\nBools|,|Some\r\n\nDoubles|\n" +
"-3,foo,false,1.0\n" +
"4,bar,true,2.0\n" +
"-5,baz,false,3.0\n";
final CsvReader.Result result = parse(defaultCsvReader().setquoteChar('|'), toInputStream(input));
final ColumnSet cs = toColumnSet(result);
Assertions.assertThat(cs.columns[0].name).isEqualTo("Some\nInts");
Assertions.assertThat(cs.columns[1].name).isEqualTo("Some\rStrings");
Assertions.assertThat(cs.columns[2].name).isEqualTo("Some\r\nBools");
Assertions.assertThat(cs.columns[3].name).isEqualTo("Some\r\n\nDoubles");
}

@Test
public void multilineColumnNameReportsCorrectRowNumber() {
// Too many columns is an error.
final String input = "" +
"|Some\nInts|,|Some\rStrings|,|Some\r\nBools|,|Some\r\n\nDoubles|\n" +
"-3,foo,false,1.0\n" +
"4,bar,true,2.0,quz\n" +
"-5,baz,false,3.0\n";
Assertions.assertThatThrownBy(() -> parse(defaultCsvReader().setquoteChar('|'), toInputStream(input)))
.hasRootCauseMessage("Row 8 has too many columns (expected 4)");
}


private static final String BOOLEAN_INPUT = "" +
"Values\n" +
"true\n" +
Expand Down Expand Up @@ -400,8 +464,9 @@ public void multi() throws CsvReaderException {
final boolean oneCharIJK = oneCharIJ && entriesAreAllNullOrOneChar[kk];
final Class<?> expectedType = SimpleInferrer.infer(expectedTypes[kk], inferredIJ, oneCharIJK);
final String input = "Values\n" + allInputs[ii] + allInputs[jj] + allInputs[kk];
final InputStream inputStream = toInputStream(input);
final CsvReader csvReader = defaultCsvReader().setParsers(Parsers.COMPLETE);
final ColumnSet columnSet = parse(csvReader, input);
final ColumnSet columnSet = toColumnSet(parse(csvReader, inputStream));
final Class<?> actualType = columnSet.columns[0].reinterpretedType;
Assertions.assertThat(actualType)
.withFailMessage("Expected to infer type %s; actually inferred %s. Failing input: %s",
Expand Down Expand Up @@ -1531,32 +1596,37 @@ private static CsvReader defaultCsvReader() {
}

private static void invokeTest(CsvReader csvReader, String input, ColumnSet expected) throws CsvReaderException {
final ColumnSet actual = parse(csvReader, input);
final InputStream inputStream = toInputStream(input);
final CsvReader.Result result = parse(csvReader, inputStream);
final ColumnSet actual = toColumnSet(result);
final String expectedToString = expected.toString();
final String actualToString = actual.toString();
Assertions.assertThat(actualToString).isEqualTo(expectedToString);
}

private static ColumnSet parse(CsvReader csvReader, String input) throws CsvReaderException {
final StringReader reader = new StringReader(input);
final ReaderInputStream inputStream = new ReaderInputStream(reader, StandardCharsets.UTF_8);
return parse(csvReader, inputStream);
}

/**
* Parses {@code reader} according to the specifications of {@code this}. The {@code reader} will be closed upon
* return.
*
* <p>
* Note: this implementation will buffer the {@code reader} internally.
*
* Parses {@code inputStream} according to the specifications of {@code csvReader}.
*
* @param inputStream the input stream.
* @return the new table
* @return The parsed data
* @throws CsvReaderException If any sort of failure occurs.
*/
private static ColumnSet parse(CsvReader csvReader, InputStream inputStream) throws CsvReaderException {
final CsvReader.Result result = csvReader.read(inputStream, makeMySinkFactory());
private static CsvReader.Result parse(CsvReader csvReader, InputStream inputStream) throws CsvReaderException {
return csvReader.read(inputStream, makeMySinkFactory());
}

/**
* Convert String to InputStream
*/
private static InputStream toInputStream(final String input) {
final StringReader reader = new StringReader(input);
return new ReaderInputStream(reader, StandardCharsets.UTF_8);
}

/***
* Converts the {@link CsvReader.Result} to a {@link ColumnSet}.
*/
private static ColumnSet toColumnSet(final CsvReader.Result result) {
final int numCols = result.numCols();

final String[] columnNames = result.columnNames();
Expand Down