Skip to content

Commit

Permalink
[MINOR] CSV parsing optimization bypassing hash function
Browse files Browse the repository at this point in the history
Split different versions of parsing csv file such that a task would
directly parse the csv into a dense or sparse matrix block.
In all tests now the csv parsing is faster than previous.

Also added tests for no values in csv, where the delimiters are
directly next to each other. This was missing before leading to
waisted time trying tokenizers.
  • Loading branch information
Baunsgaard committed Apr 12, 2021
1 parent 614c168 commit 5c63f30
Show file tree
Hide file tree
Showing 22 changed files with 616 additions and 320 deletions.
Expand Up @@ -626,7 +626,7 @@ public static FrameObject javaRDDStringCSVToFrameObject(JavaRDD<String> javaRDD,
JavaPairRDD<Long, FrameBlock> rdd;

rdd = FrameRDDConverterUtils.csvToBinaryBlock(jsc(), javaPairRDDText, mc, frameObject.getSchema(), false,
",", false, -1, UtilFunctions.defaultNaString);
",", false, -1, null);

frameObject.setRDDHandle(new RDDObject(rdd));
return frameObject;
Expand Down
Expand Up @@ -66,7 +66,6 @@
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.ProgramConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.Statistics;

public class VariableCPInstruction extends CPInstruction implements LineageTraceable {
Expand Down Expand Up @@ -419,7 +418,7 @@ else if (parts.length >= 10) {
boolean hasHeader = Boolean.parseBoolean(parts[curPos]);
String delim = parts[curPos+1];
boolean fill = Boolean.parseBoolean(parts[curPos+2]);
double fillValue = UtilFunctions.parseToDouble(parts[curPos+3],UtilFunctions.defaultNaString);
double fillValue = Double.parseDouble(parts[curPos+3]);
String naStrings = null;
if ( parts.length == 16+extSchema )
naStrings = parts[curPos+4];
Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
Expand All @@ -48,6 +50,9 @@
import org.apache.sysds.utils.Statistics;

public class CSVReblockSPInstruction extends UnarySPInstruction {

private static final Log LOG = LogFactory.getLog(CSVReblockSPInstruction.class.getName());

private int _blen;
private boolean _hasHeader;
private String _delim;
Expand Down Expand Up @@ -84,11 +89,14 @@ public static CSVReblockSPInstruction parseInstruction(String str) {
String delim = parts[5];
boolean fill = Boolean.parseBoolean(parts[6]);
double fillValue = Double.parseDouble(parts[7]);
Set<String> naStrings = null;

// Set<String> naStrings = UtilFunctions.defaultNaString;
Set<String> naStrings = new HashSet<>();
for(String s:parts[8].split(DataExpression.DELIM_NA_STRING_SEP)){
naStrings.add(s);
String[] naS = parts[8].split(DataExpression.DELIM_NA_STRING_SEP);

if(naS.length > 0 && !(naS.length ==1 && naS[0].isEmpty())){
naStrings = new HashSet<>();
for(String s: naS)
naStrings.add(s);
}

return new CSVReblockSPInstruction(null, in, out, blen, blen,
Expand All @@ -110,6 +118,7 @@ public void processInstruction(ExecutionContext ec) {
//set output characteristics
DataCharacteristics mcIn = sec.getDataCharacteristics(input1.getName());
DataCharacteristics mcOut = sec.getDataCharacteristics(output.getName());

mcOut.set(mcIn.getRows(), mcIn.getCols(), _blen);

//check for in-memory reblock (w/ lazy spark context, potential for latency reduction)
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.util.List;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
Expand Down Expand Up @@ -77,7 +79,7 @@
import scala.Tuple2;

public class RDDConverterUtils {
// private static final Log LOG = LogFactory.getLog(RDDConverterUtils.class.getName());
private static final Log LOG = LogFactory.getLog(RDDConverterUtils.class.getName());

public static final String DF_ID_COLUMN = "__INDEX";

Expand Down Expand Up @@ -173,10 +175,10 @@ public static JavaPairRDD<MatrixIndexes, MatrixBlock> csvToBinaryBlock(JavaSpark
//(w/ robustness for mistakenly counted header in nnz)
if( !mc.dimsKnown(true) ) {
LongAccumulator aNnz = sc.sc().longAccumulator("nnz");
JavaRDD<String> tmp = input.values()
.map(new CSVAnalysisFunction(aNnz, delim));
CSVAnalysisFunction csvAF = new CSVAnalysisFunction(aNnz, delim);
JavaRDD<String> tmp = input.values().map(csvAF);
long rlen = tmp.count() - (hasHeader ? 1 : 0);
long clen = tmp.first().split(delim).length;
long clen = IOUtilFunctions.split(tmp.first(), delim).length;
long nnz = Math.min(rlen*clen, UtilFunctions.toLong(aNnz.value()));
mc.set(rlen, clen, mc.getBlocksize(), nnz);
}
Expand Down Expand Up @@ -611,8 +613,8 @@ private static class CSVAnalysisFunction implements Function<Text,String>
{
private static final long serialVersionUID = 2310303223289674477L;

private LongAccumulator _aNnz = null;
private String _delim = null;
private final LongAccumulator _aNnz;
private final String _delim;

public CSVAnalysisFunction( LongAccumulator aNnz, String delim )
{
Expand All @@ -633,7 +635,6 @@ public String call(Text v1)

//update counters
_aNnz.add( lnnz );

return line;
}

Expand Down Expand Up @@ -673,7 +674,7 @@ public CSVToBinaryBlockFunction(DataCharacteristics mc, boolean sparse, boolean
_delim = delim;
_fill = fill;
_fillValue = fillValue;
_naStrings = naStrings == null ? UtilFunctions.defaultNaString : naStrings;
_naStrings = naStrings;
}

@Override
Expand Down Expand Up @@ -710,18 +711,20 @@ public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<Text,Lo
boolean emptyFound = false;
for( int cix=1, pix=0; cix<=ncblks; cix++ )
{
final MatrixBlock mbc = mb[cix-1];
int lclen = UtilFunctions.computeBlockSize(_clen, cix, _blen);
if( mb[cix-1].isInSparseFormat() ) {
if( mbc.isInSparseFormat() ) {
//allocate row once (avoid re-allocations)
int lnnz = IOUtilFunctions.countNnz(parts, pix, lclen);
mb[cix-1].getSparseBlock().allocate(pos, lnnz);
mbc.getSparseBlock().allocate(pos, lnnz);
}

for( int j=0; j<lclen; j++ ) {
String part = parts[pix++].trim();
emptyFound |= part.isEmpty() && !_fill;
double val = (part.isEmpty() && _fill) ?
_fillValue : UtilFunctions.parseToDouble(part, _naStrings);
mb[cix-1].appendValue(pos, j, val);
mbc.appendValue(pos, j, val);
}
}

Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.parser.DataExpression;
import org.apache.sysds.runtime.util.UtilFunctions;

public class FileFormatPropertiesCSV extends FileFormatProperties implements Serializable
{
Expand All @@ -36,7 +35,7 @@ public class FileFormatPropertiesCSV extends FileFormatProperties implements Ser
private String delim;
private boolean fill;
private double fillValue;
private HashSet<String> naStrings;
private HashSet<String> naStrings; // default null

private boolean sparse;

Expand All @@ -47,7 +46,7 @@ public FileFormatPropertiesCSV() {
this.fill = DataExpression.DEFAULT_DELIM_FILL;
this.fillValue = DataExpression.DEFAULT_DELIM_FILL_VALUE;
this.sparse = DataExpression.DEFAULT_DELIM_SPARSE;
this.naStrings = UtilFunctions.defaultNaString;

if(LOG.isDebugEnabled())
LOG.debug("FileFormatPropertiesCSV: " + this.toString());

Expand All @@ -60,9 +59,12 @@ public FileFormatPropertiesCSV(boolean hasHeader, String delim, boolean fill, do
this.fill = fill;
this.fillValue = fillValue;

this.naStrings = new HashSet<>();
for(String s: naStrings.split(DataExpression.DELIM_NA_STRING_SEP)){
this.naStrings.add(s);
String[] naS = naStrings.split(DataExpression.DELIM_NA_STRING_SEP);

if(naS.length > 0 && !(naS.length ==1 && naS[0].isEmpty())){
this.naStrings = new HashSet<>();
for(String s: naS)
this.naStrings.add(s);
}
if(LOG.isDebugEnabled())
LOG.debug("FileFormatPropertiesCSV full settings: " + this.toString());
Expand All @@ -73,7 +75,6 @@ public FileFormatPropertiesCSV(boolean hasHeader, String delim, boolean sparse)
this.header = hasHeader;
this.delim = delim;
this.sparse = sparse;
this.naStrings = UtilFunctions.defaultNaString;
if(LOG.isDebugEnabled()){
LOG.debug("FileFormatPropertiesCSV medium settings: " + this.toString());
}
Expand Down
Expand Up @@ -161,7 +161,7 @@ else if(parts[0].equals(TfUtils.TXMTD_NDPREFIX))
for(String part : parts) // foreach cell
{
part = part.trim();
if(part.isEmpty() || naValues.contains(part)) {
if(part.isEmpty() || (naValues != null && naValues.contains(part))) {
if(isFill && dfillValue != 0)
dest.set(row, col, UtilFunctions.stringToObject(schema[col], sfillValue));
emptyValuesFound = true;
Expand Down
Expand Up @@ -180,6 +180,7 @@ public static String[] split(String str, String delim)
{
//split by whole separator required for multi-character delimiters, preserve
//all tokens required for empty cells and in order to keep cell alignment

return StringUtils.splitByWholeSeparatorPreserveAllTokens(str, delim);
}

Expand Down Expand Up @@ -281,7 +282,7 @@ else if( str.regionMatches(from, delim, 0, dlen) ) {
// slice out token and advance position
to = (to >= 0) ? to : len;
curString = str.substring(from, to);
tokens[pos++] = (naStrings.contains(curString)) ? null: curString;
tokens[pos++] = naStrings!= null ? ((naStrings.contains(curString)) ? null: curString): curString;
from = to + delim.length();
}

Expand Down

0 comments on commit 5c63f30

Please sign in to comment.