Skip to content

Commit

Permalink
Fix NPE when json file reload (#112)
Browse files Browse the repository at this point in the history
Fix #111
  • Loading branch information
Linary authored and zhoney committed Dec 5, 2019
1 parent d8cf21f commit 62727dd
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 24 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Expand Up @@ -22,7 +22,7 @@ before_install: mvn install:install-file -Dfile=$STATIC_DIR/lib/ojdbc8-12.2.0.1.
install: mvn compile -Dmaven.javadoc.skip=true | grep -v "Downloading\|Downloaded"

before_script:
- $TRAVIS_DIR/install-hugegraph.sh $TRAVIS_BRANCH | grep -v "Downloading\|Downloaded"
- $TRAVIS_DIR/install-hugegraph.sh $SERVER_VERSION | grep -v "Downloading\|Downloaded"
- |
if [ "$SOURCE_TYPE" == "hdfs" ]; then
$TRAVIS_DIR/install-hadoop.sh
Expand All @@ -47,3 +47,4 @@ env:
global:
- TRAVIS_DIR=assembly/travis
- STATIC_DIR=assembly/static
- SERVER_VERSION=0.10.4
28 changes: 7 additions & 21 deletions assembly/travis/install-hugegraph.sh
Expand Up @@ -2,33 +2,19 @@

set -ev

if [ $# -ne 1 ]; then
echo "Must pass base branch name of pull request"
if [[ $# -ne 1 ]]; then
echo "Must pass server version of hugegraph"
exit 1
fi

LOADER_BRANCH=$1
HUGEGRAPH_BRANCH=$LOADER_BRANCH
VERSION=$1
HUGEGRAPH_LINK="https://github.com/hugegraph/hugegraph/releases/download/v${VERSION}/hugegraph-${VERSION}.tar.gz"

HUGEGRAPH_GIT_URL="https://github.com/hugegraph/hugegraph.git"
wget ${HUGEGRAPH_LINK} || exit 1

git clone $HUGEGRAPH_GIT_URL || exit 1
tar -zxvf hugegraph-${VERSION}.tar.gz

cd hugegraph

git checkout $HUGEGRAPH_BRANCH || exit 1

mvn package -DskipTests || exit 1

mv hugegraph-*.tar.gz ../

cd ../

rm -rf hugegraph

tar -zxvf hugegraph-*.tar.gz

cd hugegraph-*
cd hugegraph-${VERSION}

bin/init-store.sh || exit 1

Expand Down
Expand Up @@ -48,7 +48,11 @@ public String[] header() {
}

public void header(String[] header) {
this.header = Arrays.asList(header);
if (header == null) {
this.header = null;
} else {
this.header = Arrays.asList(header);
}
}

@Override
Expand Down
Expand Up @@ -22,7 +22,6 @@
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -2221,4 +2220,102 @@ public void testLoadIncrementalModeAndReloadFailure()

FileUtils.forceDeleteOnExit(structDir);
}

@Test
public void testReloadJsonFailureFiles() throws IOException,
InterruptedException {
ioUtil.write("vertex_person.csv",
"name,age,city",
"marko,29,Beijing",
"vadas,27,Hongkong",
"tom,28,Wuhan");
ioUtil.write("edge_knows.json",
"{\"source_name\": \"marko\", \"target_name\": " +
"\"vadas\", \"date\": \"2016-01-10 12:00:00\"," +
"\"weight\": 0.5}",
// unexisted source and target vertex
"{\"source_name\": \"marko1\", \"target_name\": " +
"\"vadas1\", \"date\": \"2013-02-20 13:00:00\"," +
"\"weight\": 1.0}");

String[] args = new String[]{
"-f", structPath("reload_json_failure_files/struct.json"),
"-s", configPath("reload_json_failure_files/schema.groovy"),
"-g", GRAPH,
"-h", SERVER,
"--check-vertex", "true",
"--num-threads", "2",
"--max-parse-errors", "1",
"--test-mode", "false"
};
HugeGraphLoader loader = new HugeGraphLoader(args);
loader.load();
LoadContext context = Whitebox.getInternalState(loader, "context");

List<Edge> edges = CLIENT.graph().listEdges();
Assert.assertEquals(1, edges.size());

InputProgressMap progresses = context.newProgress().edge();
Assert.assertEquals(1, progresses.size());
progresses.forEach((key, value) -> {
Assert.assertTrue(key.startsWith("knows"));
// The error line is exactly last line
Set<InputItemProgress> loadedItems = value.loadedItems();
Assert.assertEquals(1, loadedItems.size());

InputItemProgress loadedItem = loadedItems.iterator().next();
FileItemProgress fileItem = (FileItemProgress) loadedItem;
Assert.assertEquals("edge_knows.json", fileItem.name());
Assert.assertEquals(2, fileItem.offset());
});

// Reload without modification
args = new String[]{
"-f", structPath("reload_json_failure_files/struct.json"),
"-s", configPath("reload_json_failure_files/schema.groovy"),
"-g", GRAPH,
"-h", SERVER,
"--incremental-mode", "true",
"--reload-failure", "true",
"--check-vertex", "true",
"--num-threads", "2",
"--max-parse-errors", "1",
"--test-mode", "false"
};
// No exception throw, but error line still exist
HugeGraphLoader.main(args);
Thread.sleep(1000);

// Reload with modification
File structDir = FileUtils.getFile(structPath(
"reload_json_failure_files/struct"));
File currentDir = FileUtils.getFile(structPath(
"reload_json_failure_files/struct/current/"));
File[] files = currentDir.listFiles();
Assert.assertNotNull(files);
Assert.assertEquals(1, files.length);

File knowsFailureFile = files[0];
List<String> failureLines = FileUtils.readLines(knowsFailureFile,
Constants.CHARSET);
Assert.assertEquals(2, failureLines.size());
Assert.assertEquals("{\"source_name\": \"marko1\", \"target_name\": " +
"\"vadas1\", \"date\": \"2013-02-20 13:00:00\"," +
"\"weight\": 1.0}",
failureLines.get(1));

failureLines.remove(1);
failureLines.add("{\"source_name\": \"marko\", \"target_name\": " +
"\"tom\", \"date\": \"2013-02-20 13:00:00\"," +
"\"weight\": 1.0}");
FileUtils.writeLines(knowsFailureFile, failureLines, false);

// No exception throw, and error line doesn't exist
HugeGraphLoader.main(args);

edges = CLIENT.graph().listEdges();
Assert.assertEquals(2, edges.size());

FileUtils.forceDeleteOnExit(structDir);
}
}
22 changes: 22 additions & 0 deletions src/test/resources/reload_json_failure_files/schema.groovy
@@ -0,0 +1,22 @@
// Define schema
schema.propertyKey("name").asText().ifNotExist().create();
schema.propertyKey("age").asInt().ifNotExist().create();
schema.propertyKey("city").asText().ifNotExist().create();
schema.propertyKey("weight").asDouble().ifNotExist().create();
schema.propertyKey("lang").asText().ifNotExist().create();
schema.propertyKey("date").asDate().ifNotExist().create();
schema.propertyKey("price").asDouble().ifNotExist().create();

schema.vertexLabel("person")
.properties("name", "age", "city")
.primaryKeys("name")
.nullableKeys("age", "city")
.ifNotExist()
.create();

schema.edgeLabel("knows")
.sourceLabel("person")
.targetLabel("person")
.properties("date", "weight")
.ifNotExist()
.create();
35 changes: 35 additions & 0 deletions src/test/resources/reload_json_failure_files/struct.json
@@ -0,0 +1,35 @@
{
"vertices": [
{
"label": "person",
"input": {
"type": "${source_type}",
"path": "${store_path}/vertex_person.csv",
"format": "CSV",
"header": ["name", "age", "city"],
"charset": "UTF-8",
"skipped_line": {
"regex": "(^#|^//).*"
}
},
"null_values": ["NULL", "null", ""]
}
],
"edges": [
{
"label": "knows",
"source": ["source_name"],
"target": ["target_name"],
"input": {
"type": "${source_type}",
"path": "${store_path}/edge_knows.json",
"format": "JSON",
"date_format": "yyyy-MM-dd HH:mm:ss"
},
"field_mapping": {
"source_name": "name",
"target_name": "name"
}
}
]
}
37 changes: 37 additions & 0 deletions src/test/resources/reload_json_failure_files/struct_hdfs.json
@@ -0,0 +1,37 @@
{
"vertices": [
{
"label": "person",
"input": {
"type": "${source_type}",
"path": "${store_path}/vertex_person.csv",
"core_site_path": "src/test/resources/hdfs_with_core_site_path/core-site.xml",
"format": "CSV",
"header": ["name", "age", "city"],
"charset": "UTF-8",
"skipped_line": {
"regex": "(^#|^//).*"
}
},
"null_values": ["NULL", "null", ""]
}
],
"edges": [
{
"label": "knows",
"source": ["source_name"],
"target": ["target_name"],
"input": {
"type": "${source_type}",
"path": "${store_path}/edge_knows.json",
"core_site_path": "src/test/resources/hdfs_with_core_site_path/core-site.xml",
"format": "JSON",
"date_format": "yyyy-MM-dd HH:mm:ss"
},
"field_mapping": {
"source_name": "name",
"target_name": "name"
}
}
]
}

0 comments on commit 62727dd

Please sign in to comment.