Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add doc && some fix #397

Merged
merged 6 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -630,11 +630,11 @@ jobs:
if: always()
run: |
export JAVA_HOME=/usr/lib/jvm/default-java/
yes | /tmp/hadoop-2.10.1/sbin/stop-dfs.sh || true
HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no" /tmp/hadoop-2.10.1/sbin/stop-dfs.sh || true
rm -rf /tmp/hadoop* || true
helm uninstall graphstore || true
kubectl delete pvc -l app.kubernetes.io/instance=graphstore --wait=false || true
kubectl delete pod graphstore-graphscope-store-service-frontend-test-rpc-service --wait=false || true
rm -rf /tmp/hadoop* || true
sudo docker rmi -f registry.cn-hongkong.aliyuncs.com/graphscope/graphstore:${{ github.sha }} || true


Expand Down
8 changes: 1 addition & 7 deletions .github/workflows/hadoop_scripts/prepare_hadoop.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ cp $BASE_DIR/hdfs-site.xml $HADOOP_HOME/etc/hadoop/hdfs-site.xml
sed s/DEFAULT_FS/${FS//\//\\/}/ $BASE_DIR/core-site.xml.template > $HADOOP_HOME/etc/hadoop/core-site.xml
sed -i 's/\${JAVA_HOME}/\/usr\/lib\/jvm\/default-java\//' $HADOOP_HOME/etc/hadoop/hadoop-env.sh

if grep "$(cat ~/.ssh/id_rsa.pub)" ~/.ssh/known_hosts; then
echo "Already in known hosts."
else
cat ~/.ssh/id_rsa.pub >> ~/.ssh/known_hosts
fi

$HADOOP_HOME/bin/hdfs namenode -format
yes | $HADOOP_HOME/sbin/start-dfs.sh
HADOOP_SSH_OPTS="-o StrictHostKeyChecking=no" $HADOOP_HOME/sbin/start-dfs.sh

93 changes: 93 additions & 0 deletions interactive_engine/src/data_load_tools/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Data Load Tools
This is a toolset for bulk loading data from raw files to graphscope persistent storage service. It includes a hadoop map-reduce job for offline data build, and a tool for ingesting and committing the offline output to the persistent storage service.

## Quick Start
### Prequisities

- Java compilation environment (Maven 3.5+ / JDK1.8), if you need to build the tools from source code
- Hadoop cluster (version 2.x) that can run map-reduce jobs and has HDFS supported
- Running GIE with persistent storage service (graph schema should be properly defined)

### Get Binary

If you have the distribution package `maxgraph.tar.gz`, decompress it. Then you can find the map-reduce job jar `data_load_tools-0.0.1-SNAPSHOT.jar` under `maxgraph/lib/` and the executable `load_tool.sh` under `maxgraph/bin/`.

If you want to build from source code, just run `mvn clean package -DskipTests`. You can find the compiled jar `data_load_tools-0.0.1-SNAPSHOT.jar` in the `target/` directory. The `load_tool.sh` is just a wrapper for java command, you can only use `data_load_tools-0.0.1-SNAPSHOT.jar` in the following demonstration.

### Usage
The data loading tools assume the original data files are in the HDFS. Each file should represents either a vertex type or a relationship of an edge type. Below is the sample data of a vertex type `person`:

```
id|name
1000|Alice
1001|Bob
...
```

The data loading procedure consists of 3 steps:

#### 1. Offline data build

Build data by running the hadoop map-reduce job with following command:

```
$ hadoop jar data_load_tools-0.0.1-SNAPSHOT.jar com.alibaba.maxgraph.dataload.databuild.OfflineBuild <path/to/config/file>
```

The config file should follow a format that is recognized by Java `java.util.Properties` class. Here is an example:

```
split.size=256
partition.num=16
separator=\\|
input.path=/tmp/ldbc_sample
output.path=/tmp/data_output
graph.endpoint=1.2.3.4:55555
column.mapping.config={"person_0_0.csv":{"label":"person","propertiesColMap":{"0":"id","1":"name"}},"person_knows_person_0_0.csv":{"label":"knows","srcLabel":"person","dstLabel":"person","srcPkColMap":{"0":"id"},"dstPkColMap":{"1":"id"},"propertiesColMap":{"2":"date"}}}
skip.header=true
```

Details of the parameters are listed below:

| Config key | Required | Default | Description |
| --- | --- | --- | --- |
| split.size| false | 256 | Hadoop map-reduce input data split size in MB |
| partition.num | true | - | Partition num of target graph |
| separator | false | \\\\\| | Seperator used to parse each field in a line |
| input.path | true | - | Input HDFS dir |
| output.path | true | - | Output HDFS dir |
| graph.endpoint | true | - | RPC endpoint of the graph storage service. You can get the RPC endpoint following this document: [GraphScope Store Service](https://github.com/alibaba/GraphScope/tree/main/charts/graphscope-store-service) |
| column.mapping.config | true | - | Mapping info for each input file in JSON format. Each key in the first level should be a fileName that can be found in the `input.path`, and the corresponding value defines the mapping info. For a vertex type, the mapping info should includes 1) `label` of the vertex type, 2) `propertiesColMap` that describes the mapping from input field to graph property in the format of `{ columnIdx: "propertyName" }`. For an edge type, the mapping info should includes 1) `label` of the edge type, 2) `srcLabel` of the source vertex type, 3) `dstLabel` of the destination vertex type, 4) `srcPkColMap` that describes the mapping from input field to graph property of the primary keys in the source vertex type, 5) `dstPkColMap` that describes the mapping from input field to graph property of the primary keys in the destination vertex type, 6) `propertiesColMap` that describes the mapping from input field to graph property of the edge type |
|skip.header|false|true|Whether to skip the first line of the input file|


#### 2. Ingest data

Now ingest the offline built data into the graph storage. If you have `load_data.sh`, then run:

```
$ ./load_data.sh -c ingest -d hdfs://1.2.3.4:9000/tmp/data_output
```
Or you can run with `java`:

```
$ java -cp data_load_tools-0.0.1-SNAPSHOT.jar com.alibaba.maxgraph.dataload.LoadTool -c ingest -d hdfs://1.2.3.4:9000/tmp/data_output
```

The offline built data can be ingested successfully only once, otherwise errors will occur.

#### 3. Commit data

After data ingested into graph storage, you need to commit data loading. The data will not be able to read until committed successfully. If you have `load_data.sh`, then run:

```
$ ./load_data.sh -c commit -d hdfs://1.2.3.4:9000/tmp/data_output
```
Or you can run with `java`:

```
$ java -cp data_load_tools-0.0.1-SNAPSHOT.jar com.alibaba.maxgraph.dataload.LoadTool -c commit -d hdfs://1.2.3.4:9000/tmp/data_output
```

**Notice: The later committed data will overwrite the earlier committed data which have same vertex types or edge relations.**

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static void main(String[] args) throws ParseException, IOException {
.longOpt("dir")
.hasArg()
.argName("HDFS_PATH")
.desc("data directory of HDFS. e.g., hdfs://127.0.0.1:8000/build_output")
.desc("data directory of HDFS. e.g., hdfs://1.2.3.4:9000/build_output")
.build());
options.addOption(Option.builder("h")
.longOpt("help")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,27 @@ public class DataBuildMapper extends Mapper<LongWritable, Text, BytesWritable, B
private BytesWritable outKey = new BytesWritable();
private BytesWritable outVal = new BytesWritable();
private boolean ldbcCustomize;
private boolean skipHeader;

@Override
protected void setup(Context context) throws IOException {
this.objectMapper = new ObjectMapper();
Configuration conf = context.getConfiguration();
this.separator = conf.get(OfflineBuild.SEPARATOR, "\\|");
this.separator = conf.get(OfflineBuild.SEPARATOR);
String schemaJson = conf.get(OfflineBuild.SCHEMA_JSON);
this.graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema();
this.dataEncoder = new DataEncoder(this.graphSchema);
String columnMappingsJson = conf.get(OfflineBuild.COLUMN_MAPPINGS);
this.fileToColumnMappingInfo = this.objectMapper.readValue(columnMappingsJson,
new TypeReference<Map<String, ColumnMappingInfo>>() {});
this.ldbcCustomize = conf.getBoolean(OfflineBuild.LDBC_CUSTOMIZE, false);

this.skipHeader = conf.getBoolean(OfflineBuild.SKIP_HEADER, true);
DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00"));
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (key.get() == 0L) {
if (skipHeader && key.get() == 0L) {
return;
}
String fullPath = context.getConfiguration().get(MRJobConfig.MAP_INPUT_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class OfflineBuild {
public static final String COLUMN_MAPPINGS = "column.mappings";
public static final String LDBC_CUSTOMIZE = "ldbc.customize";
public static final String LOAD_AFTER_BUILD = "load.after.build";
public static final String SKIP_HEADER = "skip.header";

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String propertiesFile = args[0];
Expand Down Expand Up @@ -91,6 +92,7 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
String ldbcCustomize = properties.getProperty(LDBC_CUSTOMIZE, "true");
long splitSize = Long.valueOf(properties.getProperty(SPLIT_SIZE, "256")) * 1024 * 1024;
boolean loadAfterBuild = properties.getProperty(LOAD_AFTER_BUILD, "false").equalsIgnoreCase("true");
boolean skipHeader = properties.getProperty(SKIP_HEADER, "true").equalsIgnoreCase("true");
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
Expand All @@ -100,6 +102,8 @@ public static void main(String[] args) throws IOException, ClassNotFoundExceptio
String mappings = objectMapper.writeValueAsString(columnMappingInfos);
conf.setStrings(COLUMN_MAPPINGS, mappings);
conf.setBoolean(LDBC_CUSTOMIZE, ldbcCustomize.equalsIgnoreCase("true"));
conf.set(SEPARATOR, properties.getProperty(SEPARATOR, "\\|"));
conf.setBoolean(SKIP_HEADER, skipHeader);
Job job = Job.getInstance(conf, "build graph data");
job.setJarByClass(OfflineBuild.class);
job.setMapperClass(DataBuildMapper.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.maxgraph.proto.v2.*;
import com.alibaba.maxgraph.v2.common.CompletionCallback;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

public class StoreIngestService extends StoreIngestGrpc.StoreIngestImplBase {
Expand All @@ -39,7 +40,7 @@ public void onCompleted(Void res) {

@Override
public void onError(Throwable t) {
responseObserver.onError(t);
responseObserver.onError(Status.INTERNAL.withDescription(t.getMessage()).asRuntimeException());
}
});
}
Expand Down