Skip to content

Commit

Permalink
PUBDEV-5429: Add support for ingesting from multiple directories + sp…
Browse files Browse the repository at this point in the history
…eedup (#2653)

This change also improves performance of ingest when importing from
large number of source files / directories. Previous implementation
was sequential and resulted in a single request per source. This is
especially bad for HTTP ingest which is done eagerly.

Current implementation does a single request for all the files. Loading
is paralellized on a single node (not distributed - typically not necessary).
  • Loading branch information
michalkurka committed Jul 18, 2018
1 parent 4740d86 commit 95af092
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 16 deletions.
26 changes: 22 additions & 4 deletions h2o-core/src/main/java/water/api/ImportFilesHandler.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package water.api;

import water.H2O;
import water.api.schemas3.ImportFilesMultiV3;
import water.api.schemas3.ImportFilesV3;

import java.util.ArrayList;
Expand All @@ -16,10 +17,10 @@ public class ImportFilesHandler extends Handler {

@SuppressWarnings("unused") // called through reflection by RequestServer
public ImportFilesV3 importFiles(int version, ImportFilesV3 importFiles) {
ArrayList<String> files = new ArrayList();
ArrayList<String> keys = new ArrayList();
ArrayList<String> fails = new ArrayList();
ArrayList<String> dels = new ArrayList();
ArrayList<String> files = new ArrayList<>();
ArrayList<String> keys = new ArrayList<>();
ArrayList<String> fails = new ArrayList<>();
ArrayList<String> dels = new ArrayList<>();

H2O.getPM().importFiles(importFiles.path, importFiles.pattern, files, keys, fails, dels);

Expand All @@ -29,5 +30,22 @@ public ImportFilesV3 importFiles(int version, ImportFilesV3 importFiles) {
importFiles.dels = dels.toArray(new String[dels.size()]);
return importFiles;
}

@SuppressWarnings("unused") // called through reflection by RequestServer
public ImportFilesMultiV3 importFilesMulti(int version, ImportFilesMultiV3 importFiles) {
ArrayList<String> files = new ArrayList<>();
ArrayList<String> keys = new ArrayList<>();
ArrayList<String> fails = new ArrayList<>();
ArrayList<String> dels = new ArrayList<>();

H2O.getPM().importFiles(importFiles.paths, importFiles.pattern, files, keys, fails, dels);

importFiles.files = files.toArray(new String[files.size()]);
importFiles.destination_frames = keys.toArray(new String[keys.size()]);
importFiles.fails = fails.toArray(new String[fails.size()]);
importFiles.dels = dels.toArray(new String[dels.size()]);
return importFiles;
}

}

4 changes: 4 additions & 0 deletions h2o-core/src/main/java/water/api/RegisterV3Api.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public void registerEndPoints(RestApiContext context) {
"POST /3/ImportFiles", ImportFilesHandler.class, "importFiles",
"Import raw data files into a single-column H2O Frame.");

context.registerEndpoint("importFilesMulti",
"POST /3/ImportFilesMulti", ImportFilesHandler.class, "importFilesMulti",
"Import raw data files from multiple directories (or different data sources) into a single-column H2O Frame.");

context.registerEndpoint("importSqlTable",
"POST /99/ImportSQLTable", ImportSQLTableHandler.class, "importSQLTable",
"Import SQL table into an H2O Frame.");
Expand Down
37 changes: 37 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ImportFilesMultiV3.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package water.api.schemas3;

import water.Iced;
import water.api.API;

public class ImportFilesMultiV3 extends RequestSchemaV3<ImportFilesMultiV3.ImportFilesMulti, ImportFilesMultiV3> {

public final static class ImportFilesMulti extends Iced<ImportFilesMulti> {
public String[] paths;
public String pattern;
public String files[];
public String destination_frames[];
public String fails[];
public String dels[];
}

// Input fields
@API(help = "paths", required = true)
public String[] paths;

@API(help = "pattern", direction = API.Direction.INPUT)
public String pattern;

// Output fields
@API(help = "files", direction = API.Direction.OUTPUT)
public String files[];

@API(help = "names", direction = API.Direction.OUTPUT)
public String destination_frames[];

@API(help = "fails", direction = API.Direction.OUTPUT)
public String fails[];

@API(help = "dels", direction = API.Direction.OUTPUT)
public String dels[];

}
73 changes: 69 additions & 4 deletions h2o-core/src/main/java/water/persist/PersistManager.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package water.persist;

import water.H2O;
import water.Key;
import water.MRTask;
import water.Value;
import water.*;
import water.exceptions.H2OIllegalArgumentException;
import water.fvec.UploadFileVec;
import water.parser.BufferedString;
import water.util.FileUtils;
import water.util.FrameUtils;
import water.util.Log;
Expand Down Expand Up @@ -266,6 +264,73 @@ public List<String> calcTypeaheadMatches(String filter, int limit) {
return I[Value.NFS].calcTypeaheadMatches(filter, limit);
}

public void importFiles(String[] paths, String pattern, ArrayList<String> files, ArrayList<String> keys, ArrayList<String> fails, ArrayList<String> dels) {
if (paths.length == 1) {
importFiles(paths[0], pattern, files, keys, fails, dels);
return;
}

ImportFilesTask importFilesTask = new ImportFilesTask(paths, pattern);
H2O.submitTask(new LocalMR(importFilesTask, paths.length)).join();

ImportFilesTask.addAllTo(importFilesTask._pFiles, files);
ImportFilesTask.addAllTo(importFilesTask._pKeys, keys);
ImportFilesTask.addAllTo(importFilesTask._pFails, fails);
ImportFilesTask.addAllTo(importFilesTask._pDels, dels);
}

private static class ImportFilesTask extends MrFun<ImportFilesTask> {

private final String[] _paths;
private final String _pattern;

BufferedString[][] _pFiles;
BufferedString[][] _pKeys;
BufferedString[][] _pFails;
BufferedString[][] _pDels;

public ImportFilesTask(String[] paths, String pattern) {
_paths = paths;
_pattern = pattern;
_pFiles = new BufferedString[paths.length][];
_pKeys = new BufferedString[paths.length][];
_pFails = new BufferedString[paths.length][];
_pDels = new BufferedString[paths.length][];
}

@Override
protected void map(int t) {
ArrayList<String> pFiles = new ArrayList<>();
ArrayList<String> pKeys = new ArrayList<>();
ArrayList<String> pFails = new ArrayList<>();
ArrayList<String> pDels = new ArrayList<>();

H2O.getPM().importFiles(_paths[t], _pattern, pFiles, pKeys, pFails, pDels);

_pFiles[t] = toArray(pFiles);
_pKeys[t] = toArray(pKeys);
_pFails[t] = toArray(pFails);
_pDels[t] = toArray(pDels);
}

private static BufferedString[] toArray(List<String> ls) {
BufferedString[] bss = new BufferedString[ls.size()];
int i = 0;
for (String s : ls) {
bss[i++] = new BufferedString(s);
}
return bss;
}

private static void addAllTo(BufferedString[][] bssAry, ArrayList<String> target) {
for (BufferedString[] bss : bssAry) {
for (BufferedString bs : bss)
target.add(bs.toString());
}
}

}

/**
* From a path produce a list of files and keys for parsing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ water.api.schemas3.GarbageCollectV3
water.api.schemas3.H2OErrorV3
water.api.schemas3.H2OModelBuilderErrorV3
water.api.schemas3.ImportFilesV3
water.api.schemas3.ImportFilesMultiV3
water.api.schemas3.ImportSQLTableV99
water.api.schemas3.InitIDV3
water.api.schemas3.InteractionV3
Expand Down
14 changes: 6 additions & 8 deletions h2o-py/h2o/h2o.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,15 @@ def lazy_import(path, pattern=None):
"""
assert_is_type(path, str, [str])
assert_is_type(pattern, str, None)
if is_type(path, str):
return _import(path, pattern)
else:
return [_import(p, pattern)[0] for p in path]
paths = [path] if is_type(path, str) else path
return _import_multi(paths, pattern)


def _import(path, pattern):
assert_is_type(path, str)
def _import_multi(paths, pattern):
assert_is_type(paths, [str])
assert_is_type(pattern, str, None)
j = api("GET /3/ImportFiles", data={"path": path, "pattern": pattern})
if j["fails"]: raise ValueError("ImportFiles of " + path + " failed on " + str(j["fails"]))
j = api("POST /3/ImportFilesMulti", {"paths": paths, "pattern": pattern})
if j["fails"]: raise ValueError("ImportFiles of " + paths + " failed on " + str(j["fails"]))
return j["destination_frames"]


Expand Down
19 changes: 19 additions & 0 deletions h2o-py/tests/testdir_misc/pyunit_import_multi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import print_function
import sys
sys.path.insert(1,"../../")
import h2o
from tests import pyunit_utils

# test that h2o.import_file works on multiple sources
def import_multi():
airlines = h2o.import_file(path=[
pyunit_utils.locate("smalldata/testng/airlines_train.csv"),
pyunit_utils.locate("smalldata/testng/airlines_test.csv")
])

assert airlines.nrows == 24421 + 2691

if __name__ == "__main__":
pyunit_utils.standalone_test(import_multi)
else:
import_multi()

0 comments on commit 95af092

Please sign in to comment.