Skip to content

Commit

Permalink
[FLINK-1512] [java api] Add CsvReader for reading into POJOs
Browse files Browse the repository at this point in the history
  • Loading branch information
chiwanpark authored and fhueske committed Mar 25, 2015
1 parent 033c69f commit 7b1c19c
Show file tree
Hide file tree
Showing 7 changed files with 738 additions and 240 deletions.
Expand Up @@ -29,12 +29,21 @@
import org.apache.flink.types.parser.StringParser;
import org.apache.flink.types.parser.StringValueParser;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;


public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {

private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);

private static final long serialVersionUID = 1L;

Expand All @@ -50,6 +59,13 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
// --------------------------------------------------------------------------------------------

private transient FieldParser<?>[] fieldParsers;

// To speed up readRecord processing. Used to find windows line endings.
// It is set when open so that readRecord does not have to evaluate it
protected boolean lineDelimiterIsLinebreak = false;

protected transient int commentCount;
protected transient int invalidLineCount;


// --------------------------------------------------------------------------------------------
Expand All @@ -58,7 +74,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>

private Class<?>[] fieldTypes = EMPTY_TYPES;

private boolean[] fieldIncluded = EMPTY_INCLUDED;
protected boolean[] fieldIncluded = EMPTY_INCLUDED;

private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER;

Expand All @@ -69,8 +85,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
private boolean quotedStringParsing = false;

private byte quoteCharacter;



protected byte[] commentPrefix = null;


// --------------------------------------------------------------------------------------------
// Constructors and getters/setters for the configurable parameters
// --------------------------------------------------------------------------------------------
Expand All @@ -93,6 +111,46 @@ public int getNumberOfNonNullFields() {
return this.fieldTypes.length;
}

public byte[] getCommentPrefix() {
return commentPrefix;
}

public void setCommentPrefix(byte[] commentPrefix) {
this.commentPrefix = commentPrefix;
}

public void setCommentPrefix(char commentPrefix) {
setCommentPrefix(String.valueOf(commentPrefix));
}

public void setCommentPrefix(String commentPrefix) {
setCommentPrefix(commentPrefix, Charsets.UTF_8);
}

public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new IllegalArgumentException("Charset name must not be null");
}

if (commentPrefix != null) {
Charset charset = Charset.forName(charsetName);
setCommentPrefix(commentPrefix, charset);
} else {
this.commentPrefix = null;
}
}

public void setCommentPrefix(String commentPrefix, Charset charset) {
if (charset == null) {
throw new IllegalArgumentException("Charset must not be null");
}
if (commentPrefix != null) {
this.commentPrefix = commentPrefix.getBytes(charset);
} else {
this.commentPrefix = null;
}
}

public byte[] getFieldDelimiter() {
return fieldDelim;
}
Expand Down Expand Up @@ -291,7 +349,23 @@ public void open(FileInputSplit split) throws IOException {
readLine(); // read and ignore
}
}


@Override
public void close() throws IOException {
if (this.invalidLineCount > 0) {
if (LOG.isWarnEnabled()) {
LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped.");
}
}

if (this.commentCount > 0) {
if (LOG.isInfoEnabled()) {
LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped.");
}
}
super.close();
}

protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException {

boolean[] fieldIncluded = this.fieldIncluded;
Expand Down Expand Up @@ -400,4 +474,58 @@ protected int skipFields(byte[] bytes, int startPos, int limit, byte[] delim) {
}
}
}

@SuppressWarnings("unused")
protected static void checkAndCoSort(int[] positions, Class<?>[] types) {
if (positions.length != types.length) {
throw new IllegalArgumentException("The positions and types must be of the same length");
}

TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>();

for (int i = 0; i < positions.length; i++) {
if (positions[i] < 0) {
throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
}
if (types[i] == null) {
throw new IllegalArgumentException("The type " + i + " is invalid (null)");
}

if (map.containsKey(positions[i])) {
throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times.");
}

map.put(positions[i], types[i]);
}

int i = 0;
for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) {
positions[i] = entry.getKey();
types[i] = entry.getValue();
i++;
}
}

protected static void checkForMonotonousOrder(int[] positions, Class<?>[] types) {
if (positions.length != types.length) {
throw new IllegalArgumentException("The positions and types must be of the same length");
}

int lastPos = -1;

for (int i = 0; i < positions.length; i++) {
if (positions[i] < 0) {
throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid.");
}
if (types[i] == null) {
throw new IllegalArgumentException("The type " + i + " is invalid (null)");
}

if (positions[i] <= lastPos) {
throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported).");
}

lastPos = positions[i];
}
}
}

0 comments on commit 7b1c19c

Please sign in to comment.