Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,26 @@ protected Pair<Integer, Integer> computeCSVSize(Path path, JobConf job, FileSyst
// compute number of rows
int nrow = 0;
for(int i = 0; i < splits.length; i++) {
RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL);
try {
nrow = countLinesInReader(reader, ncol , i == 0 && _props.hasHeader());
}
finally {
IOUtilFunctions.closeSilently(reader);
}
boolean header = i == 0 && _props.hasHeader();
nrow += countLinesInReader(splits[i], informat, job, ncol, header);
}

return new Pair<>(nrow, ncol);
}

protected static int countLinesInReader(RecordReader<LongWritable, Text> reader, int ncol, boolean header)

protected static int countLinesInReader(InputSplit split, TextInputFormat inFormat, JobConf job, long ncol,
boolean header) throws IOException {
RecordReader<LongWritable, Text> reader = inFormat.getRecordReader(split, job, Reporter.NULL);
try {
return countLinesInReader(reader, ncol, header);
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}

protected static int countLinesInReader(RecordReader<LongWritable, Text> reader, long ncol, boolean header)
throws IOException {
final LongWritable key = new LongWritable();
final Text value = new Text();
Expand All @@ -231,7 +239,7 @@ protected static int countLinesInReader(RecordReader<LongWritable, Text> reader,
reader.next(key, value);
while(reader.next(key, value)) {
// note the metadata can be located at any row when spark.
nrow += containsMetaTag(value.toString()) ? 0 : 1;
nrow += containsMetaTag(value) ? 0 : 1;
}
return nrow;
}
Expand All @@ -240,8 +248,11 @@ protected static int countLinesInReader(RecordReader<LongWritable, Text> reader,
}
}

private final static boolean containsMetaTag(String val) {
return val.startsWith(TfUtils.TXMTD_MVPREFIX)//
|| val.startsWith(TfUtils.TXMTD_NDPREFIX);
private final static boolean containsMetaTag(Text val) {
if(val.charAt(0) == '#')
return val.find(TfUtils.TXMTD_MVPREFIX) > -1//
|| val.find(TfUtils.TXMTD_NDPREFIX) > -1;
else
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.hops.OptimizerUtils;
Expand Down Expand Up @@ -72,7 +68,7 @@ protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs,
//compute num rows per split
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0));
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0, clen));
List<Future<Integer>> cret = pool.invokeAll(tasks);

//compute row offset per split via cumsum on row counts
Expand Down Expand Up @@ -113,7 +109,7 @@ protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSyst
try {
ArrayList<CountRowsTask> tasks = new ArrayList<>();
for( int i=0; i<splits.length; i++ )
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0));
tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader()&& i==0, ncol));
List<Future<Integer>> cret = pool.invokeAll(tasks);
for( Future<Integer> count : cret )
nrow += count.get().intValue();
Expand All @@ -130,34 +126,25 @@ protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSyst
return new Pair<>((int)nrow, ncol);
}

private static class CountRowsTask implements Callable<Integer>
{
private InputSplit _split = null;
private TextInputFormat _informat = null;
private JobConf _job = null;
private boolean _hasHeader = false;
private boolean _firstSplit = false;
private static class CountRowsTask implements Callable<Integer> {
private final InputSplit _split;
private final TextInputFormat _informat;
private final JobConf _job;
private final boolean _hasHeader;
private final long _nCol;

public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader, boolean first) {
public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader, long nCol) {
_split = split;
_informat = informat;
_job = job;
_hasHeader = hasHeader;
_firstSplit = first;
_nCol = nCol;
}

@Override
public Integer call()
throws Exception
{
RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL);
try {
// it is assumed that if we read parallel number of rows, there are at least two columns.
return countLinesInReader(reader, 2 , _firstSplit && _hasHeader);
}
finally {
IOUtilFunctions.closeSilently(reader);
}
public Integer call() throws Exception {
return countLinesInReader(_split, _informat, _job, _nCol, _hasHeader);

}
}

Expand Down