Skip to content

Commit

Permalink
Load/store DataFrames from CSV #6
Browse files Browse the repository at this point in the history
* support for loader with options
  • Loading branch information
andrus committed Feb 18, 2019
1 parent cb335bd commit b4e027f
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 106 deletions.
1 change: 1 addition & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
* #2 Support for DataFrame aggregation API
* #4 'vconcat' and 'hconcat' : support for concatenting DataFrames
* #5 'sort' and 'sortByColumns' to reorder DataFrames
* #6 Load/store DataFrames from CSV
* #7 RowProxy - "flyweight" API to hide access to Object[]
15 changes: 3 additions & 12 deletions dflib-csv/src/main/java/com/nhl/dflib/csv/Csv.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
package com.nhl.dflib.csv;

import com.nhl.dflib.DataFrame;
import org.apache.commons.csv.CSVFormat;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.Reader;

public class Csv {

public static DataFrame fromFile(String filePath) {
return new CsvReader(CSVFormat.DEFAULT, () -> readerFromFilePath(filePath)).load();
return loader().fromFile(filePath);
}

private static Reader readerFromFilePath(String path) {
try {
return new FileReader(path);
} catch (FileNotFoundException e) {
throw new RuntimeException("File not found: " + path, e);
}
public static CsvLoader loader() {
return new CsvLoader();
}
}
157 changes: 157 additions & 0 deletions dflib-csv/src/main/java/com/nhl/dflib/csv/CsvLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package com.nhl.dflib.csv;

import com.nhl.dflib.DataFrame;
import com.nhl.dflib.Index;
import com.nhl.dflib.map.ValueMapper;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

public class CsvLoader {

private int skipRows;
private Index columns;
private CSVFormat format;

// storing converters as list to ensure predictable resolution order when the user supplies overlapping converters
private List<Pair> converters;

public CsvLoader() {
this.format = CSVFormat.DEFAULT;
this.converters = new ArrayList<>();
}

private static Reader readerFromFilePath(String path) {
try {
return new FileReader(path);
} catch (FileNotFoundException e) {
throw new RuntimeException("File not found: " + path, e);
}
}

/**
* Skips the specified number of rows. E.g. if the header is defined manually, you might call this method with "1"
* as an argument.
*
* @param n number of rows to skip
* @return this loader instance
*/
public CsvLoader skipRows(int n) {
this.skipRows = n;
return this;
}

/**
* Provides an alternative header to the returned DataFrame.
*
* @param columns user-defined DataFrame columns
* @return this loader instance
*/
public CsvLoader columns(String... columns) {
this.columns = Index.withNames(columns);
return this;
}

public CsvLoader columnTypes(ValueMapper<String, ?>... typeConverters) {
for (int i = 0; i < typeConverters.length; i++) {
int captureI = i;
converters.add(new Pair(ind -> captureI, typeConverters[i]));
}
return this;
}


public CsvLoader columnType(int column, ValueMapper<String, ?> typeConverter) {
converters.add(new Pair(i -> column, typeConverter));
return this;
}

public CsvLoader columnType(String column, ValueMapper<String, ?> typeConverter) {
converters.add(new Pair(i -> i.position(column).ordinal(), typeConverter));
return this;
}

public DataFrame fromFile(String filePath) {
try (Reader r = readerFromFilePath(filePath)) {
return fromReader(r);
} catch (IOException e) {
throw new RuntimeException("Error closing file reader: " + filePath, e);
}
}

public DataFrame fromReader(Reader reader) {
try (CSVParser parser = format.parse(reader)) {
Iterator<CSVRecord> it = parser.iterator();

rewind(it);
Index columns = createColumns(it);

if (!it.hasNext()) {
return DataFrame.fromRowsList(columns, Collections.emptyList());
}

ValueMapper<String, ?>[] converters = createConverters(columns);

return new CsvLoaderWorker(columns, converters).load(it);
} catch (IOException e) {
throw new RuntimeException("Error reading CSV", e);
}
}

private void rewind(Iterator<CSVRecord> it) {
for (int i = 0; i < skipRows && it.hasNext(); i++) {
it.next();
}
}

private Index createColumns(Iterator<CSVRecord> it) {
if (it.hasNext()) {
return columns != null ? columns : loadColumns(it.next());
} else {
return columns != null ? columns : Index.withNames();
}
}

private Index loadColumns(CSVRecord header) {

int width = header.size();
String[] columnNames = new String[width];
for (int i = 0; i < width; i++) {
columnNames[i] = header.get(i);
}

return Index.withNames(columnNames);
}

private ValueMapper<String, ?>[] createConverters(Index columns) {

ValueMapper<String, ?>[] converters = new ValueMapper[columns.span()];

// there may be overlapping pairs... the last one wins
for (Pair p : this.converters) {
converters[p.positionResolver.apply(columns)] = p.converter;
}

return converters;
}

private class Pair {
Function<Index, Integer> positionResolver;
ValueMapper<String, ?> converter;

Pair(Function<Index, Integer> positionResolver, ValueMapper<String, ?> converter) {
this.positionResolver = positionResolver;
this.converter = converter;
}
}
}
46 changes: 46 additions & 0 deletions dflib-csv/src/main/java/com/nhl/dflib/csv/CsvLoaderWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.nhl.dflib.csv;

import com.nhl.dflib.DataFrame;
import com.nhl.dflib.Index;
import com.nhl.dflib.map.ValueMapper;
import org.apache.commons.csv.CSVRecord;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

class CsvLoaderWorker {

private Index columns;
private ValueMapper<String, ?>[] converters;

public CsvLoaderWorker(Index columns, ValueMapper<String, ?>[] converters) {
this.columns = columns;
this.converters = converters;
}

DataFrame load(Iterator<CSVRecord> it) {

List<Object[]> rows = new ArrayList<>();

while (it.hasNext()) {
rows.add(loadRow(columns, it.next()));
}

return DataFrame.fromRowsList(columns, rows);
}

private Object[] loadRow(Index columns, CSVRecord record) {
int width = columns.size();

Object[] row = new Object[width];

for (int i = 0; i < width; i++) {
String v = record.get(i);
row[i] = (converters[i] != null) ? converters[i].map(v) : v;
}

return row;
}

}
77 changes: 0 additions & 77 deletions dflib-csv/src/main/java/com/nhl/dflib/csv/CsvReader.java

This file was deleted.

23 changes: 23 additions & 0 deletions dflib-csv/src/test/java/com/nhl/dflib/csv/BaseCsvTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.nhl.dflib.csv;

import org.junit.BeforeClass;

import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;

public class BaseCsvTest {

private static File CSV_BASE;

@BeforeClass
public static void findCsvDir() throws URISyntaxException {
URI csvUri = CsvTest.class.getResource("f1.csv").toURI();
CSV_BASE = new File(csvUri).getAbsoluteFile().getParentFile();
}

protected static String csvPath(String name) {
return CSV_BASE.getPath() + File.separator + name;
}

}
Loading

0 comments on commit b4e027f

Please sign in to comment.