Skip to content

Commit

Permalink
Add doc for data loader (#397)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianliplus committed Jun 10, 2021
1 parent e4d0ebd commit 3fa1131
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 14 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,11 @@ jobs:
if: always()
run: |
export JAVA_HOME=/usr/lib/jvm/default-java/
yes | /tmp/hadoop-2.10.1/sbin/stop-dfs.sh || true
HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no" /tmp/hadoop-2.10.1/sbin/stop-dfs.sh || true
rm -rf /tmp/hadoop* || true
helm uninstall graphstore || true
kubectl delete pvc -l app.kubernetes.io/instance=graphstore --wait=false || true
kubectl delete pod graphstore-graphscope-store-service-frontend-test-rpc-service --wait=false || true
rm -rf /tmp/hadoop* || true
sudo docker rmi -f registry.cn-hongkong.aliyuncs.com/graphscope/graphstore:${{ github.sha }} || true
Expand Down
8 changes: 1 addition & 7 deletions .github/workflows/hadoop_scripts/prepare_hadoop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ cp $BASE_DIR/hdfs-site.xml $HADOOP_HOME/etc/hadoop/hdfs-site.xml
sed s/DEFAULT_FS/${FS//\//\\/}/ $BASE_DIR/core-site.xml.template > $HADOOP_HOME/etc/hadoop/core-site.xml
sed -i 's/\${JAVA_HOME}/\/usr\/lib\/jvm\/default-java\//' $HADOOP_HOME/etc/hadoop/hadoop-env.sh

if grep "$(cat ~/.ssh/id_rsa.pub)" ~/.ssh/known_hosts; then
echo "Already in known hosts."
else
cat ~/.ssh/id_rsa.pub >> ~/.ssh/known_hosts
fi

$HADOOP_HOME/bin/hdfs namenode -format
yes | $HADOOP_HOME/sbin/start-dfs.sh
HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no" $HADOOP_HOME/sbin/start-dfs.sh

93 changes: 93 additions & 0 deletions interactive_engine/src/data_load_tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Data Load Tools
This is a toolset for bulk loading data from raw files to graphscope persistent storage service. It includes a hadoop map-reduce job for offline data build, and a tool for ingesting and committing the offline output to the persistent storage service.

## Quick Start
### Prequisities

- Java compilation environment (Maven 3.5+ / JDK1.8), if you need to build the tools from source code
- Hadoop cluster (version 2.x) that can run map-reduce jobs and has HDFS supported
- Running GIE with persistent storage service (graph schema should be properly defined)

### Get Binary

If you have the distribution package `maxgraph.tar.gz`, decompress it. Then you can find the map-reduce job jar `data_load_tools-0.0.1-SNAPSHOT.jar` under `maxgraph/lib/` and the executable `load_tool.sh` under `maxgraph/bin/`.

If you want to build from source code, just run `mvn clean package -DskipTests`. You can find the compiled jar `data_load_tools-0.0.1-SNAPSHOT.jar` in the `target/` directory. The `load_tool.sh` is just a wrapper for java command, you can only use `data_load_tools-0.0.1-SNAPSHOT.jar` in the following demonstration.

### Usage
The data loading tools assume the original data files are in the HDFS. Each file should represents either a vertex type or a relationship of an edge type. Below is the sample data of a vertex type `person`:

```
id|name
1000|Alice
1001|Bob
...
```

The data loading procedure consists of 3 steps:

#### 1. Offline data build

Build data by running the hadoop map-reduce job with following command:

```
$ hadoop jar data_load_tools-0.0.1-SNAPSHOT.jar com.alibaba.maxgraph.dataload.databuild.OfflineBuild <path/to/config/file>
```

The config file should follow a format that is recognized by Java `java.util.Properties` class. Here is an example:

```
split.size=256
partition.num=16
separator=\\|
input.path=/tmp/ldbc_sample
output.path=/tmp/data_output
graph.endpoint=1.2.3.4:55555
column.mapping.config={"person_0_0.csv":{"label":"person","propertiesColMap":{"0":"id","1":"name"}},"person_knows_person_0_0.csv":{"label":"knows","srcLabel":"person","dstLabel":"person","srcPkColMap":{"0":"id"},"dstPkColMap":{"1":"id"},"propertiesColMap":{"2":"date"}}}
skip.header=true
```

Details of the parameters are listed below:

| Config key | Required | Default | Description |
| --- | --- | --- | --- |
| split.size| false | 256 | Hadoop map-reduce input data split size in MB |
| partition.num | true | - | Partition num of target graph |
| separator | false | \\\\\| | Seperator used to parse each field in a line |
| input.path | true | - | Input HDFS dir |
| output.path | true | - | Output HDFS dir |
| graph.endpoint | true | - | RPC endpoint of the graph storage service. You can get the RPC endpoint following this document: [GraphScope Store Service](https://github.com/alibaba/GraphScope/tree/main/charts/graphscope-store-service) |
| column.mapping.config | true | - | Mapping info for each input file in JSON format. Each key in the first level should be a fileName that can be found in the `input.path`, and the corresponding value defines the mapping info. For a vertex type, the mapping info should includes 1) `label` of the vertex type, 2) `propertiesColMap` that describes the mapping from input field to graph property in the format of `{ columnIdx: "propertyName" }`. For an edge type, the mapping info should includes 1) `label` of the edge type, 2) `srcLabel` of the source vertex type, 3) `dstLabel` of the destination vertex type, 4) `srcPkColMap` that describes the mapping from input field to graph property of the primary keys in the source vertex type, 5) `dstPkColMap` that describes the mapping from input field to graph property of the primary keys in the destination vertex type, 6) `propertiesColMap` that describes the mapping from input field to graph property of the edge type |
|skip.header|false|true|Whether to skip the first line of the input file|


#### 2. Ingest data

Now ingest the offline built data into the graph storage. If you have `load_data.sh`, then run:

```
$ ./load_data.sh -c ingest -d hdfs://1.2.3.4:9000/tmp/data_output
```
Or you can run with `java`:

```
$ java -cp data_load_tools-0.0.1-SNAPSHOT.jar com.alibaba.maxgraph.dataload.LoadTool -c ingest -d hdfs://1.2.3.4:9000/tmp/data_output
```

The offline built data can be ingested successfully only once, otherwise errors will occur.

#### 3. Commit data

After data ingested into graph storage, you need to commit data loading. The data will not be able to read until committed successfully. If you have `load_data.sh`, then run:

```
$ ./load_data.sh -c commit -d hdfs://1.2.3.4:9000/tmp/data_output
```
Or you can run with `java`:

```
$ java -cp data_load_tools-0.0.1-SNAPSHOT.jar com.alibaba.maxgraph.dataload.LoadTool -c commit -d hdfs://1.2.3.4:9000/tmp/data_output
```

**Notice: The later committed data will overwrite the earlier committed data which have same vertex types or edge relations.**

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static void main(String[] args) throws ParseException, IOException {
.longOpt("dir")
.hasArg()
.argName("HDFS_PATH")
.desc("data directory of HDFS. e.g., hdfs://127.0.0.1:8000/build_output")
.desc("data directory of HDFS. e.g., hdfs://1.2.3.4:9000/build_output")
.build());
options.addOption(Option.builder("h")
.longOpt("help")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,27 @@ public class DataBuildMapper extends Mapper<LongWritable, Text, BytesWritable, B
private BytesWritable outKey = new BytesWritable();
private BytesWritable outVal = new BytesWritable();
private boolean ldbcCustomize;
private boolean skipHeader;

@Override
protected void setup(Context context) throws IOException {
this.objectMapper = new ObjectMapper();
Configuration conf = context.getConfiguration();
this.separator = conf.get(OfflineBuild.SEPARATOR, "\\|");
this.separator = conf.get(OfflineBuild.SEPARATOR);
String schemaJson = conf.get(OfflineBuild.SCHEMA_JSON);
this.graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema();
this.dataEncoder = new DataEncoder(this.graphSchema);
String columnMappingsJson = conf.get(OfflineBuild.COLUMN_MAPPINGS);
this.fileToColumnMappingInfo = this.objectMapper.readValue(columnMappingsJson,
new TypeReference<Map<String, ColumnMappingInfo>>() {});
this.ldbcCustomize = conf.getBoolean(OfflineBuild.LDBC_CUSTOMIZE, false);

this.skipHeader = conf.getBoolean(OfflineBuild.SKIP_HEADER, true);
DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00"));
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (key.get() == 0L) {
if (skipHeader && key.get() == 0L) {
return;
}
String fullPath = context.getConfiguration().get(MRJobConfig.MAP_INPUT_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class OfflineBuild {
public static final String COLUMN_MAPPINGS = "column.mappings";
public static final String LDBC_CUSTOMIZE = "ldbc.customize";
public static final String LOAD_AFTER_BUILD = "load.after.build";
public static final String SKIP_HEADER = "skip.header";

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String propertiesFile = args[0];
Expand Down Expand Up @@ -91,6 +92,7 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
String ldbcCustomize = properties.getProperty(LDBC_CUSTOMIZE, "true");
long splitSize = Long.valueOf(properties.getProperty(SPLIT_SIZE, "256")) * 1024 * 1024;
boolean loadAfterBuild = properties.getProperty(LOAD_AFTER_BUILD, "false").equalsIgnoreCase("true");
boolean skipHeader = properties.getProperty(SKIP_HEADER, "true").equalsIgnoreCase("true");
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
Expand All @@ -100,6 +102,8 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
String mappings = objectMapper.writeValueAsString(columnMappingInfos);
conf.setStrings(COLUMN_MAPPINGS, mappings);
conf.setBoolean(LDBC_CUSTOMIZE, ldbcCustomize.equalsIgnoreCase("true"));
conf.set(SEPARATOR, properties.getProperty(SEPARATOR, "\\|"));
conf.setBoolean(SKIP_HEADER, skipHeader);
Job job = Job.getInstance(conf, "build graph data");
job.setJarByClass(OfflineBuild.class);
job.setMapperClass(DataBuildMapper.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.maxgraph.proto.v2.*;
import com.alibaba.maxgraph.v2.common.CompletionCallback;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

public class StoreIngestService extends StoreIngestGrpc.StoreIngestImplBase {
Expand All @@ -39,7 +40,7 @@ public void onCompleted(Void res) {

@Override
public void onError(Throwable t) {
responseObserver.onError(t);
responseObserver.onError(Status.INTERNAL.withDescription(t.getMessage()).asRuntimeException());
}
});
}
Expand Down

0 comments on commit 3fa1131

Please sign in to comment.