Skip to content

Commit

Permalink
[FLINK-1168] Add support for multi-char field delimiters in CSVInputF…
Browse files Browse the repository at this point in the history
…ormats.

This commit includes parts of Cbro's pull request and subsumes PR #247

This closes #247
This closes #264
  • Loading branch information
fhueske committed Jan 26, 2015
1 parent 06b2acf commit 0548a93
Show file tree
Hide file tree
Showing 68 changed files with 564 additions and 379 deletions.
Expand Up @@ -63,7 +63,7 @@ protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
.fieldDelimiter('|')
.fieldDelimiter("|")
.types(String.class, Integer.class, String.class);

//output the data with AvroOutputFormat for specific user type
Expand Down
Expand Up @@ -76,7 +76,7 @@ public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Long, Long>> input = env.readCsvFile(args[0])
.fieldDelimiter('\t').types(Long.class, Long.class);
.fieldDelimiter("\t").types(Long.class, Long.class);

DataSet<Tuple2<Long, Long>> result = input.map(
new MapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>>() {
Expand Down
Expand Up @@ -20,9 +20,6 @@
package org.apache.flink.api.common.io;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;

import org.slf4j.Logger;
Expand Down Expand Up @@ -188,27 +185,7 @@ public void setDelimiter(char delimiter) {
}

public void setDelimiter(String delimiter) {
setDelimiter(delimiter, Charsets.UTF_8);
}

public void setDelimiter(String delimiter, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new IllegalArgumentException("Charset name must not be null");
}

Charset charset = Charset.forName(charsetName);
setDelimiter(delimiter, charset);
}

public void setDelimiter(String delimiter, Charset charset) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}

this.delimiter = delimiter.getBytes(charset);
this.delimiter = delimiter.getBytes(Charsets.UTF_8);
}

public int getLineLengthLimit() {
Expand Down Expand Up @@ -281,19 +258,7 @@ public void configure(Configuration parameters) {

String delimString = parameters.getString(RECORD_DELIMITER, null);
if (delimString != null) {
String charsetName = parameters.getString(RECORD_DELIMITER_ENCODING, null);

if (charsetName == null) {
setDelimiter(delimString);
} else {
try {
setDelimiter(delimString, charsetName);
}
catch (UnsupportedCharsetException e) {
throw new IllegalArgumentException("The charset with the name '" + charsetName +
"' is not supported on this TaskManager instance.", e);
}
}
setDelimiter(delimString);
}

// set the number of samples
Expand Down Expand Up @@ -638,11 +603,6 @@ private boolean fillBuffer() throws IOException {
*/
protected static final String RECORD_DELIMITER = "delimited-format.delimiter";

/**
* The configuration key to set the record delimiter encoding.
*/
private static final String RECORD_DELIMITER_ENCODING = "delimited-format.delimiter-encoding";

/**
* The configuration key to set the number of samples to take for the statistics.
*/
Expand Down Expand Up @@ -712,24 +672,6 @@ public T recordDelimiter(String delimiter) {
return ret;
}

/**
* Sets the delimiter to be the given string. The string will be converted to bytes for more efficient
* comparison during input parsing. The conversion will be done using the charset with the given name.
* The charset must be available on the processing nodes, otherwise an exception will be raised at
* runtime.
*
* @param delimiter The delimiter string.
* @param charsetName The name of the encoding character set.
* @return The builder itself.
*/
public T recordDelimiter(String delimiter, String charsetName) {
this.config.setString(RECORD_DELIMITER, delimiter);
this.config.setString(RECORD_DELIMITER_ENCODING, charsetName);
@SuppressWarnings("unchecked")
T ret = (T) this;
return ret;
}

/**
* Sets the number of line samples to take in order to estimate the base statistics for the
* input format.
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.apache.flink.api.common.io;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.flink.core.fs.FileInputSplit;
Expand All @@ -38,7 +39,9 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>

private static final boolean[] EMPTY_INCLUDED = new boolean[0];

private static final char DEFAULT_FIELD_DELIMITER = ',';
private static final byte[] DEFAULT_FIELD_DELIMITER = new byte[] {','};

private static final char QUOTE_CHARACTER = '"';


// --------------------------------------------------------------------------------------------
Expand All @@ -57,7 +60,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>

private boolean[] fieldIncluded = EMPTY_INCLUDED;

private char fieldDelim = DEFAULT_FIELD_DELIMITER;
private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;

private boolean lenient;

Expand Down Expand Up @@ -86,16 +89,24 @@ public int getNumberOfNonNullFields() {
return this.fieldTypes.length;
}

public char getFieldDelimiter() {
public byte[] getFieldDelimiter() {
return fieldDelim;
}

public void setFieldDelimiter(char fieldDelim) {
if (fieldDelim > Byte.MAX_VALUE) {
throw new IllegalArgumentException("The field delimiter must be an ASCII character.");
public void setFieldDelimiter(byte[] delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Delimiter must not be null");
}

this.fieldDelim = fieldDelim;

this.fieldDelim = delimiter;
}

public void setFieldDelimiter(char delimiter) {
setFieldDelimiter(String.valueOf(delimiter));
}

public void setFieldDelimiter(String delimiter) {
this.fieldDelim = delimiter.getBytes(Charsets.UTF_8);
}

public boolean isLenient() {
Expand Down Expand Up @@ -308,7 +319,7 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu
}
else {
// skip field
startPos = skipFields(bytes, startPos, limit, fieldDelim);
startPos = skipFields(bytes, startPos, limit, this.fieldDelim);
if (startPos < 0) {
if (!lenient) {
String lineAsString = new String(bytes, offset, numBytes);
Expand All @@ -325,31 +336,32 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu
private String fieldTypesToString() {
StringBuilder string = new StringBuilder();
string.append(this.fieldTypes[0].toString());

for (int i = 1; i < this.fieldTypes.length; i++) {
string.append(", ").append(this.fieldTypes[i]);
}

return string.toString();
}

protected int skipFields(byte[] bytes, int startPos, int limit, char delim) {
protected int skipFields(byte[] bytes, int startPos, int limit, byte[] delim) {

int i = startPos;

final byte delByte = (byte) delim;
byte current;
final int delimLimit = limit - delim.length + 1;

// skip over initial whitespace lines
while (i < limit && ((current = bytes[i]) == ' ' || current == '\t')) {
i++;
}

// first none whitespace character
if (i < limit && bytes[i] == '"') {
if (i < limit && bytes[i] == QUOTE_CHARACTER) {
// quoted string
i++; // the quote

while (i < limit && bytes[i] != '"') {
while (i < limit && bytes[i] != QUOTE_CHARACTER) {
i++;
}

Expand All @@ -358,27 +370,27 @@ protected int skipFields(byte[] bytes, int startPos, int limit, char delim) {
i++; // the quote

// skip trailing whitespace characters
while (i < limit && (current = bytes[i]) != delByte) {
while (i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) {
current = bytes[i];
if (current == ' ' || current == '\t') {
i++;
}
else {
} else {
return -1; // illegal case of non-whitespace characters trailing
}
}

return (i == limit ? limit : i+1);
return (i >= delimLimit ? limit : i + delim.length);
} else {
// exited due to line end without quote termination
return -1;
}
}
else {
// unquoted field
while (i < limit && bytes[i] != delByte) {
while (i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) {
i++;
}
return (i == limit ? limit : i+1);
return (i >= delimLimit ? limit : i + delim.length);
}
}
}
Expand Up @@ -25,25 +25,27 @@ public class ByteParser extends FieldParser<Byte> {
private byte result;

@Override
public int parseField(byte[] bytes, int startPos, int limit, char delimiter, Byte reusable) {
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) {
int val = 0;
boolean neg = false;

final int delimLimit = limit-delimiter.length+1;

if (bytes[startPos] == '-') {
neg = true;
startPos++;

// check for empty field with only the sign
if (startPos == limit || bytes[startPos] == delimiter) {
if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
return -1;
}
}

for (int i = startPos; i < limit; i++) {
if (bytes[i] == delimiter) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
this.result = (byte) (neg ? -val : val);
return i+1;
return i + delimiter.length;
}
if (bytes[i] < 48 || bytes[i] > 57) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
Expand Down
Expand Up @@ -30,27 +30,30 @@ public class ByteValueParser extends FieldParser<ByteValue> {
private ByteValue result;

@Override
public int parseField(byte[] bytes, int startPos, int limit, char delimiter, ByteValue reusable) {
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ByteValue reusable) {
int val = 0;
boolean neg = false;

this.result = reusable;

final int delimLimit = limit-delimiter.length+1;

if (bytes[startPos] == '-') {
neg = true;
startPos++;

// check for empty field with only the sign
if (startPos == limit || bytes[startPos] == delimiter) {
if (startPos == limit || (startPos < delimLimit && delimiterNext(bytes, startPos, delimiter))) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ORPHAN_SIGN);
return -1;
}
}

for (int i = startPos; i < limit; i++) {
if (bytes[i] == delimiter) {

if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
reusable.setValue((byte) (neg ? -val : val));
return i+1;
return i + delimiter.length;
}
if (bytes[i] < 48 || bytes[i] > 57) {
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
Expand Down
Expand Up @@ -29,18 +29,22 @@ public class DoubleParser extends FieldParser<Double> {
private double result;

@Override
public int parseField(byte[] bytes, int startPos, int limit, char delimiter, Double reusable) {
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) {
int i = startPos;
final byte delByte = (byte) delimiter;

final int delimLimit = limit-delimiter.length+1;

while (i < limit && bytes[i] != delByte) {
while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
break;
}
i++;
}

String str = new String(bytes, startPos, i-startPos);
try {
this.result = Double.parseDouble(str);
return (i == limit) ? limit : i+1;
return (i == limit) ? limit : i + delimiter.length;
}
catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
Expand Down
Expand Up @@ -29,12 +29,16 @@ public class DoubleValueParser extends FieldParser<DoubleValue> {
private DoubleValue result;

@Override
public int parseField(byte[] bytes, int startPos, int limit, char delim, DoubleValue reusable) {
public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) {

int i = startPos;
final byte delByte = (byte) delim;

while (i < limit && bytes[i] != delByte) {

final int delimLimit = limit-delimiter.length+1;

while (i < limit) {
if (i < delimLimit && delimiterNext(bytes, i, delimiter)) {
break;
}
i++;
}

Expand All @@ -43,7 +47,7 @@ public int parseField(byte[] bytes, int startPos, int limit, char delim, DoubleV
double value = Double.parseDouble(str);
reusable.setValue(value);
this.result = reusable;
return (i == limit) ? limit : i+1;
return (i == limit) ? limit : i + delimiter.length;
}
catch (NumberFormatException e) {
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
Expand Down

0 comments on commit 0548a93

Please sign in to comment.