From 8ab7b1b8483e60a9ae960a92f815dd52b0a80c23 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 24 May 2023 15:14:46 +0800 Subject: [PATCH] Overhaul data-load-tools (#2715) - Add Volume for dataloading - Refactor data-load-tools - Speed up ingesting by multithreading download. --- .github/workflows/gss.yml | 4 +- .github/workflows/k8s-ci.yml | 2 +- Makefile | 2 +- docs/storage_engine/groot.md | 90 ++++--- interactive_engine/assembly/README.md | 4 +- interactive_engine/assembly/groot.xml | 2 +- interactive_engine/assembly/pom.xml | 4 +- .../assembly/src/bin/groot/store_ctl.sh | 16 +- .../groot/common/config/DataLoadConfig.java | 67 +++++ interactive_engine/data-load-tool/README.md | 71 +++--- interactive_engine/data-load-tool/pom.xml | 21 ++ .../data-load-tool/src/bin/load_tool.sh | 54 ++-- .../groot/dataload/CommitDataCommand.java | 11 +- .../groot/dataload/DataCommand.java | 114 +++------ .../groot/dataload/IngestDataCommand.java | 26 +- .../graphscope/groot/dataload/LoadTool.java | 51 +--- .../graphscope/groot/dataload/OSSFileObj.java | 127 ---------- .../groot/dataload/databuild/BytesRef.java | 6 +- .../groot/dataload/databuild/Codec.java | 10 +- .../databuild/ColumnMappingConfig.java | 29 --- .../dataload/databuild/DataBuildMapper.java | 26 +- .../databuild/DataBuildMapperOdps.java | 32 +-- .../databuild/DataBuildPartitioner.java | 10 +- .../databuild/DataBuildPartitionerOdps.java | 13 +- .../databuild/DataBuildReducerOdps.java | 92 +++---- .../groot/dataload/databuild/DataEncoder.java | 6 +- .../dataload/databuild/OfflineBuild.java | 80 +++--- .../dataload/databuild/OfflineBuildOdps.java | 237 +++++++++--------- .../dataload/databuild/SstOutputFormat.java | 8 +- .../dataload/databuild/SstRecordWriter.java | 11 +- .../dataload/util/AbstractFileSystem.java | 22 ++ .../groot/dataload/util/FSFactory.java | 23 ++ .../graphscope/groot/dataload/util/OSSFS.java | 182 ++++++++++++++ .../groot/dataload/util/VolumeFS.java | 182 ++++++++++++++ .../groot/dataload/util/ZipUtil.java | 60 +++++ .../groot/dataload/LoadToolSpark.scala | 11 +- .../assembly/groot/src/store/graph.rs | 2 +- interactive_engine/groot-module/pom.xml | 11 + .../graphscope/groot/SnapshotCache.java | 7 +- .../groot/frontend/ClientService.java | 1 + .../schema/ddl/AbstractDropTypeExecutor.java | 8 +- .../groot/store/StoreIngestService.java | 2 +- .../graphscope/groot/store/StoreService.java | 42 ++-- .../groot/store/external/ExternalStorage.java | 69 ++++- .../groot/store/external/HdfsStorage.java | 13 +- .../groot/store/external/OssStorage.java | 45 +++- .../groot/store/external/VolumeStorage.java | 65 +++++ .../groot/store/jna/JnaGraphStore.java | 74 +----- interactive_engine/pom.xml | 65 +++-- interactive_engine/proto/sdk/client.proto | 1 + .../proto/store_ingest_service.proto | 2 +- .../compiler/schema/DefaultGraphSchema.java | 2 +- .../sdkcommon/schema/PropertyValue.java | 3 + .../graphscope/groot/sdk/GrootClient.java | 8 +- k8s/dockerfiles/graphscope-store.Dockerfile | 2 +- python/graphscope/framework/graph_schema.py | 6 +- 56 files changed, 1285 insertions(+), 849 deletions(-) create mode 100644 interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java mode change 100644 => 100755 interactive_engine/data-load-tool/src/bin/load_tool.sh delete mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/OSSFileObj.java delete mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/ColumnMappingConfig.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/AbstractFileSystem.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/FSFactory.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/OSSFS.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java create mode 100644 interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/ZipUtil.java create mode 100644 interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/VolumeStorage.java diff --git a/.github/workflows/gss.yml b/.github/workflows/gss.yml index 39beb367ea3b..76eb0e739a2a 100644 --- a/.github/workflows/gss.yml +++ b/.github/workflows/gss.yml @@ -76,7 +76,7 @@ jobs: export RUSTC_WRAPPER=/usr/local/bin/sccache sccache --start-server cd ${GITHUB_WORKSPACE}/interactive_engine - mvn clean install -P groot,groot-assembly -Drust.compile.mode=debug -DskipTests -Dgroot.compile.feature="column_filter_push_down" --quiet + mvn clean install -P groot -Drust.compile.mode=debug -DskipTests -Dgroot.compile.feature="column_filter_push_down" --quiet sccache --show-stats @@ -92,7 +92,7 @@ jobs: export SCCACHE_DIR=~/.cache/sccache export RUSTC_WRAPPER=/usr/local/bin/sccache cd ${GITHUB_WORKSPACE}/interactive_engine - mvn clean install -P groot,groot-assembly -Drust.compile.mode=debug -DskipTests --quiet + mvn clean install -P groot -Drust.compile.mode=debug -DskipTests --quiet sccache --show-stats diff --git a/.github/workflows/k8s-ci.yml b/.github/workflows/k8s-ci.yml index a40e95d916a2..0fe5fae6bcdd 100644 --- a/.github/workflows/k8s-ci.yml +++ b/.github/workflows/k8s-ci.yml @@ -692,7 +692,7 @@ jobs: minikube image load graphscope/learning:${SHORT_SHA} export PYTHONPATH=${GITHUB_WORKSPACE}/python:${PYTHONPATH} - cd ${GITHUB_WORKSPACE}/interactive_engine && mvn clean install --quiet -DskipTests -Drust.compile.skip=true -P graphscope,graphscope-assembly + cd ${GITHUB_WORKSPACE}/interactive_engine && mvn clean install --quiet -DskipTests -Drust.compile.skip=true -P graphscope cd ${GITHUB_WORKSPACE}/interactive_engine/tests # ./function_test.sh 8111 1 ./function_test.sh 8112 2 diff --git a/Makefile b/Makefile index 47e21cdff7b1..435efeb84c4e 100644 --- a/Makefile +++ b/Makefile @@ -144,7 +144,7 @@ interactive: $(INTERACTIVE_DIR)/assembly/target/graphscope.tar.gz $(INTERACTIVE_DIR)/assembly/target/graphscope.tar.gz: cd $(INTERACTIVE_DIR) && \ - mvn package -DskipTests -Drust.compile.mode=$(BUILD_TYPE) -P graphscope,graphscope-assembly -Drevision=$(VERSION) --quiet + mvn package -DskipTests -Drust.compile.mode=$(BUILD_TYPE) -P graphscope -Drevision=$(VERSION) --quiet learning-install: learning mkdir -p $(INSTALL_PREFIX) diff --git a/docs/storage_engine/groot.md b/docs/storage_engine/groot.md index d1e0f50726a1..d76aa7452dbc 100644 --- a/docs/storage_engine/groot.md +++ b/docs/storage_engine/groot.md @@ -410,41 +410,47 @@ The loading process contains three steps: 3. Commit to the online service so that data is ready for serving queries ##### Build: Building a partitioned graph -Build data by running the hadoop map-reduce job with following command. -```bash -./data_load/bin/load_tool.sh hadoop-build -``` -The config file should follow a format that is recognized by Java `java.util.Properties` class. + Build data by running the hadoop map-reduce job with following command: + + ``` + $ ./load_tool.sh build + ``` -Here is an example: - -```properties + The config file should follow a format that is recognized by Java `java.util.Properties` class. Here is an example: + +``` split.size=256 separator=\\| input.path=/tmp/ldbc_sample output.path=/tmp/data_output -graph.endpoint=: +graph.endpoint=1.2.3.4:55555 column.mapping.config={"person_0_0.csv":{"label":"person","propertiesColMap":{"0":"id","1":"name"}},"person_knows_person_0_0.csv":{"label":"knows","srcLabel":"person","dstLabel":"person","srcPkColMap":{"0":"id"},"dstPkColMap":{"1":"id"},"propertiesColMap":{"2":"date"}}} skip.header=true +load.after.build=true +# This is not required when load.after.build=true +# hadoop.endpoint=127.0.0.1:9000 +# ``` + + Details of the parameters are listed below: + + | Config key | Required | Default | Description | + |-----------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| + | split.size | false | 256 | Hadoop map-reduce input data split size in MB | + | separator | false | \\\\\| | Separator used to parse each field in a line | + | input.path | true | - | Input HDFS dir | + | output.path | true | - | Output HDFS dir | + | graph.endpoint | true | - | RPC endpoint of the graph storage service. You can get the RPC endpoint following this document: [GraphScope Store Service](https://github.com/alibaba/GraphScope/tree/main/charts/graphscope-store) | + | column.mapping.config | true | - | Mapping info for each input file in JSON format. Each key in the first level should be a fileName that can be found in the `input.path`, and the corresponding value defines the mapping info. For a vertex type, the mapping info should includes 1) `label` of the vertex type, 2) `propertiesColMap` that describes the mapping from input field to graph property in the format of `{ columnIdx: "propertyName" }`. For an edge type, the mapping info should includes 1) `label` of the edge type, 2) `srcLabel` of the source vertex type, 3) `dstLabel` of the destination vertex type, 4) `srcPkColMap` that describes the mapping from input field to graph property of the primary keys in the source vertex type, 5) `dstPkColMap` that describes the mapping from input field to graph property of the primary keys in the destination vertex type, 6) `propertiesColMap` that describes the mapping from input field to graph property of the edge type | + | skip.header | false | true | Whether to skip the first line of the input file | + | load.after.build | false | false | Whether to immediately ingest and commit the builded files | + | hadoop.endpoint | false | - | Endpoint of hadoop cluster in the format of :. Not required when `load.after.build` is set to true | + + After data building completed, you can find the output files in the `output.path` of HDFS. The output files includes a + meta file named `META`, an empty file named `_SUCCESS`, and some data files that one for each partition named in the + pattern of `part-r-xxxxx.sst`. The layout of the output directory should look like: + ``` - -Details of the parameters are listed below: - -| **Config key** | **Required** | **Default** | **Description** | -| --- | --- | --- | --- | -| split.size | false | 256 | Hadoop map-reduce input data split size in MB | -| separator | false | \\\\| | Seperator used to parse each field in a line | -| input.path | true | N/A | Input HDFS dir | -| output.path | true | N/A | Output HDFS dir | -| graph.endpoint | true | N/A | GRPC endpoint of groot. You can find the endpoint in previous section, or use `helm status demo`to get it. | -| column.mapping.config | true | N/A | Mapping info for each input file in JSON format. Each key in the first level should be a fileName that can be found in the input.path, and the corresponding value defines the mapping info. For a vertex type, the mapping info should includes 1) label of the vertex type, 2) propertiesColMap that describes the mapping from input field to graph property in the format of { columnIdx: “propertyName” }. For an edge type, the mapping info should includes 1) label of the edge type, 2) srcLabel of the source vertex type, 3) dstLabel of the destination vertex type, 4) srcPkColMap that describes the mapping from input field to graph property of the primary keys in the source vertex type, 5) dstPkColMap that describes the mapping from input field to graph property of the primary keys in the destination vertex type, 6) propertiesColMap that describes the mapping from input field to graph property of the edge type | -| skip.header | false | true | Whether to skip the first line of the input file | - - -After data building completed, you can find the output files in the `output.path` of HDFS. The output files includes a meta file named `META`, an empty file named `_SUCCESS`, and some data files that one for each partition named in the pattern of `part-r-xxxxx.sst`. The layout of the output directory should look like: - -```properties /tmp/data_output |- META |- _SUCCESS @@ -454,21 +460,29 @@ After data building completed, you can find the output files in the `output.path ... ``` -##### Ingest: Loading graph partitions +If `load.after.build=true`, then you can skip step 2 and 3. +Else, please proceed to ingest and commit. -Now ingest the offline built data into the graph storage:
NOTE: You need to make sure that the HDFS endpoint that can be accessed from the processes of the graph store. -```bash -./data_load/bin/load_tool.sh -c ingest -d hdfs://1.2.3.4:9000/tmp/data_output -``` -The offline built data can be ingested successfully only once, otherwise errors will occur. +#### 2. Loading graph partitions + + Now ingest the offline built data into the graph storage. Run: + + ``` + $ ./load_data.sh ingest + ``` -##### Commit: Commit to store service + The offline built data can be ingested successfully only once, otherwise errors will occur. + +#### 3. Commit to store service + + After data ingested into graph storage, you need to commit data loading. The data will not be able to read until committed successfully. Run: + + ``` + $ ./load_data.sh commit + ``` + + **Note: The later committed data will overwrite the earlier committed data which have same vertex types or edge relations.** -After data ingested into graph storage, you need to commit data loading. The data will not be able to read until committed successfully. -```bash -./data_load/bin/load_tool.sh -c commit -d hdfs://1.2.3.4:9000/tmp/data_output -``` -Notice: The later committed data will overwrite the earlier committed data which have same vertex types or edge relations. ### Realtime Write diff --git a/interactive_engine/assembly/README.md b/interactive_engine/assembly/README.md index 9565ffa0f289..1e5fd0fbb743 100644 --- a/interactive_engine/assembly/README.md +++ b/interactive_engine/assembly/README.md @@ -2,5 +2,5 @@ This will build graphscope or groot into an assembly archive. # Usage -`mvn package -P graphscope,graphscope-assembly` will generate a graphscope.tar.gz under `target/`. -`mvn package -P groot,groot-assembly` will generate a groot.tar.gz under `target/`. \ No newline at end of file +`mvn package -P graphscope` will generate a graphscope.tar.gz under `target/`. +`mvn package -P groot` will generate a groot.tar.gz under `target/`. \ No newline at end of file diff --git a/interactive_engine/assembly/groot.xml b/interactive_engine/assembly/groot.xml index abbe8768e999..d0d2e69503c6 100644 --- a/interactive_engine/assembly/groot.xml +++ b/interactive_engine/assembly/groot.xml @@ -1,7 +1,7 @@ - groot-assembly + groot tar.gz diff --git a/interactive_engine/assembly/pom.xml b/interactive_engine/assembly/pom.xml index 37d6e74278ea..036aecb20b10 100644 --- a/interactive_engine/assembly/pom.xml +++ b/interactive_engine/assembly/pom.xml @@ -20,7 +20,7 @@ - graphscope-assembly + graphscope @@ -49,7 +49,7 @@ - groot-assembly + groot diff --git a/interactive_engine/assembly/src/bin/groot/store_ctl.sh b/interactive_engine/assembly/src/bin/groot/store_ctl.sh index 5413eb94783b..ff2edc737f6a 100755 --- a/interactive_engine/assembly/src/bin/groot/store_ctl.sh +++ b/interactive_engine/assembly/src/bin/groot/store_ctl.sh @@ -2,9 +2,7 @@ # # groot command tool -set -x -set -e -set -o pipefail +set -xeo pipefail usage() { cat < >(tee -a "${LOG_DIR}/${LOG_NAME}.out") 2> >(tee -a "${LOG_DIR}/${LOG_NAME}.err" >&2) } -start_load_tools() { - _setup_env - java -cp "${GROOT_HOME}/lib/data-load-tool-0.0.1-SNAPSHOT.jar" \ - com.alibaba.graphscope.groot.dataload.LoadTool "$@" -} - # start groot server start_server() { _setup_env @@ -138,13 +129,10 @@ while test $# -ne 0; do -h|--help) usage; exit ;; start_max_node) start_max_node "gaia" "$@"; exit;; start_server) start_server "$@"; exit;; - start_load_tools) start_load_tools "$@"; exit;; *) echo "unrecognized option or command '${arg}'" usage; exit;; esac done -set +e -set +o pipefail -set +x +set +xeo pipefail diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java new file mode 100644 index 000000000000..fe036514b473 --- /dev/null +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/DataLoadConfig.java @@ -0,0 +1,67 @@ +package com.alibaba.graphscope.groot.common.config; + +public class DataLoadConfig { + + // Get property + + /** universal configurations **/ + public static final String GRAPH_ENDPOINT = "graph.endpoint"; + + public static final String COLUMN_MAPPING_CONFIG = "column.mapping.config"; + + public static final String LOAD_AFTER_BUILD = "load.after.build"; + + public static final String SPLIT_SIZE = "split.size"; + + public static final String UNIQUE_PATH = "unique.path"; // generated automatically for each task + public static final String USER_NAME = "auth.username"; + public static final String PASS_WORD = "auth.password"; + + /** job on HDFS configurations **/ + + // Input and output + public static final String INPUT_PATH = "input.path"; + + public static final String OUTPUT_PATH = "output.path"; + public static final String SEPARATOR = "separator"; + public static final String SKIP_HEADER = "skip.header"; + public static final String LDBC_CUSTOMIZE = "ldbc.customize"; + /* end */ + + /** job on ODPS configurations **/ + public static final String DATA_SINK_TYPE = "data.sink.type"; // hdfs, oss, volume + // The table format is `project.table` or `table`; + // For partitioned table, the format is `project.table|p1=1/p2=2` or `table|p1=1/p2=2` + public static final String OUTPUT_TABLE = "output.table"; // a dummy table + /* end */ + + // Set property + public static final String SCHEMA_JSON = "schema.json"; + public static final String COLUMN_MAPPINGS = "column.mappings"; + public static final String META_INFO = "meta.info"; + + public static final String META_FILE_NAME = "META"; + + /** OSS configurations **/ + public static final String OSS_ENDPOINT = "oss.endpoint"; + + public static final String OSS_ACCESS_ID = "oss.access.id"; + public static final String OSS_ACCESS_KEY = "oss.access.key"; + + public static final String OSS_BUCKET_NAME = "oss.bucket.name"; + public static final String OSS_OBJECT_NAME = "oss.object.name"; + public static final String OSS_INFO_URL = "oss.info.url"; + /* end */ + + /** ODPS Volume configurations **/ + public static final String ODPS_VOLUME_PROJECT = "odps.volume.project"; + + public static final String ODPS_VOLUME_NAME = "odps.volume.name"; + public static final String ODPS_VOLUME_PARTSPEC = "odps.volume.partspec"; + + public static final String ODPS_ACCESS_ID = "odps.access.id"; + public static final String ODPS_ACCESS_KEY = "odps.access.key"; + public static final String ODPS_ENDPOINT = "odps.endpoint"; + /* end */ + +} diff --git a/interactive_engine/data-load-tool/README.md b/interactive_engine/data-load-tool/README.md index 24b3b48cacb8..a28c7d851463 100644 --- a/interactive_engine/data-load-tool/README.md +++ b/interactive_engine/data-load-tool/README.md @@ -1,7 +1,7 @@ # Data Load Tools -This is a toolset for bulk loading data from raw files to graphscope persistent storage service. +This is a toolset for bulk loading data from raw files to GraphScope persistent storage service. Currently the tool supports a specific format of the raw data as described in "Data Format", -and the originial data must be located in an HDFS. To load the data files into GraphScope storage, +and the original data must be located in an HDFS. To load the data files into GraphScope storage, users can run the data-loading tool from a terminal on a Client machine, and we assume that Client has access to a Hadoop cluster, which can run MapReduce jobs, have read/write access to the HDFS, and connect to a running GraphScope storage service. @@ -15,15 +15,15 @@ and connect to a running GraphScope storage service. ### Get Binary -If you have the distribution package `groot.tar.gz`, decompress it. Then you can find the map-reduce job jar `data-load-tool-0.0.1-SNAPSHOT.jar` under `groot/lib/` and the executable `load_tool.sh` under `groot/bin/`. +If you have the distribution package `groot.tar.gz`, decompress it. Then you can find the map-reduce job jar `data-load-tool-{VERSION}-SNAPSHOT.jar` under `groot/lib/` and the executable `load_tool.sh` under `groot/bin/`. -If you want to build from source code, just run `mvn clean package -DskipTests`. You can find the compiled jar `data-load-tool-0.0.1-SNAPSHOT.jar` in the `target/` directory. The `load_tool.sh` is just a wrapper for java command, you can only use `data-load-tool-0.0.1-SNAPSHOT.jar` in the following demonstration. +If you want to build from source code, just run `mvn clean package -DskipTests`. You can find the compiled jar `data-load-tool-{VERSION}-SNAPSHOT.jar` in the `target/` directory. The `load_tool.sh` under `data-load-tools/bin/`. ### Data Format The data loading tools assume the original data files are in the HDFS. -Each file should represents either a vertex type or a relationship of an edge type. Below are the sample +Each file should represent either a vertex type or a relationship of an edge type. Below are the sample data of a vertex type person and a relationShip person-knows->person of edge type knows: - person.csv @@ -42,10 +42,10 @@ person_id|person_id_1|date ``` The first line of the data file is a header that describes the key of each field. The header is not required. -If there is no header in the data file, you need to set `skip.header` to `true` in the data building process +If there is no header in the data file, you need to set `skip.header` to `false` in the data building process (For details, see params description in "Building a partitioned graph"). -The rest lines are the data records. Each line represents one record. Data fields are seperated by a custom seperator +The rest lines are the data records. Each line represents one record. Data fields are seperated by a custom separator ("|" in the example above). In the vertex data file `person.csv`, `id` field and `name` field are the primary-key and the property of the vertex type `person` respectively. In the edge data file `person_knows_person.csv`, `person_id` field is the primary-key of the source vertex, `person_id_1` field is the primary-key of the destination vertex, `date` @@ -67,33 +67,38 @@ The loading process contains three steps: Build data by running the hadoop map-reduce job with following command: ``` - $ hadoop jar data-load-tool-0.0.1-SNAPSHOT.jar databuild.com.alibaba.graphscope.groot.dataload.OfflineBuild + $ ./load_tool.sh build ``` The config file should follow a format that is recognized by Java `java.util.Properties` class. Here is an example: - ``` - split.size=256 +``` +split.size=256 separator=\\| input.path=/tmp/ldbc_sample output.path=/tmp/data_output graph.endpoint=1.2.3.4:55555 column.mapping.config={"person_0_0.csv":{"label":"person","propertiesColMap":{"0":"id","1":"name"}},"person_knows_person_0_0.csv":{"label":"knows","srcLabel":"person","dstLabel":"person","srcPkColMap":{"0":"id"},"dstPkColMap":{"1":"id"},"propertiesColMap":{"2":"date"}}} skip.header=true - ``` +load.after.build=true +# This is not required when load.after.build=true +# hadoop.endpoint=127.0.0.1:9000 + ``` Details of the parameters are listed below: - | Config key | Required | Default | Description | - | --- | --- | --- | --- | - | split.size| false | 256 | Hadoop map-reduce input data split size in MB | - | separator | false | \\\\\| | Seperator used to parse each field in a line | - | input.path | true | - | Input HDFS dir | - | output.path | true | - | Output HDFS dir | - | graph.endpoint | true | - | RPC endpoint of the graph storage service. You can get the RPC endpoint following this document: [GraphScope Store Service](https://github.com/alibaba/GraphScope/tree/main/charts/graphscope-store) | - | column.mapping.config | true | - | Mapping info for each input file in JSON format. Each key in the first level should be a fileName that can be found in the `input.path`, and the corresponding value defines the mapping info. For a vertex type, the mapping info should includes 1) `label` of the vertex type, 2) `propertiesColMap` that describes the mapping from input field to graph property in the format of `{ columnIdx: "propertyName" }`. For an edge type, the mapping info should includes 1) `label` of the edge type, 2) `srcLabel` of the source vertex type, 3) `dstLabel` of the destination vertex type, 4) `srcPkColMap` that describes the mapping from input field to graph property of the primary keys in the source vertex type, 5) `dstPkColMap` that describes the mapping from input field to graph property of the primary keys in the destination vertex type, 6) `propertiesColMap` that describes the mapping from input field to graph property of the edge type | - |skip.header|false|true|Whether to skip the first line of the input file| - + | Config key | Required | Default | Description | + |-----------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| + | split.size | false | 256 | Hadoop map-reduce input data split size in MB | + | separator | false | \\\\\| | Separator used to parse each field in a line | + | input.path | true | - | Input HDFS dir | + | output.path | true | - | Output HDFS dir | + | graph.endpoint | true | - | RPC endpoint of the graph storage service. You can get the RPC endpoint following this document: [GraphScope Store Service](https://github.com/alibaba/GraphScope/tree/main/charts/graphscope-store) | + | column.mapping.config | true | - | Mapping info for each input file in JSON format. Each key in the first level should be a fileName that can be found in the `input.path`, and the corresponding value defines the mapping info. For a vertex type, the mapping info should includes 1) `label` of the vertex type, 2) `propertiesColMap` that describes the mapping from input field to graph property in the format of `{ columnIdx: "propertyName" }`. For an edge type, the mapping info should includes 1) `label` of the edge type, 2) `srcLabel` of the source vertex type, 3) `dstLabel` of the destination vertex type, 4) `srcPkColMap` that describes the mapping from input field to graph property of the primary keys in the source vertex type, 5) `dstPkColMap` that describes the mapping from input field to graph property of the primary keys in the destination vertex type, 6) `propertiesColMap` that describes the mapping from input field to graph property of the edge type | + | skip.header | false | true | Whether to skip the first line of the input file | + | load.after.build | false | false | Whether to immediately ingest and commit the builded files | + | hadoop.endpoint | false | - | Endpoint of hadoop cluster in the format of :. Not required when `load.after.build` is set to true | + After data building completed, you can find the output files in the `output.path` of HDFS. The output files includes a meta file named `META`, an empty file named `_SUCCESS`, and some data files that one for each partition named in the pattern of `part-r-xxxxx.sst`. The layout of the output directory should look like: @@ -108,33 +113,25 @@ skip.header=true ... ``` +If `load.after.build=true`, then you can skip step 2 and 3. +Else, please proceed to ingest and commit. + #### 2. Loading graph partitions - Now ingest the offline built data into the graph storage. If you have `load_data.sh`, then run: + Now ingest the offline built data into the graph storage. Run: ``` - $ ./load_data.sh -c ingest -d hdfs://1.2.3.4:9000/tmp/data_output - ``` - Or you can run with `java`: - - ``` - $ java -cp data-load-tool-0.0.1-SNAPSHOT.jar com.alibaba.graphscope.groot.dataload.LoadTool -c ingest -d hdfs://1.2.3.4:9000/tmp/data_output + $ ./load_data.sh ingest ``` The offline built data can be ingested successfully only once, otherwise errors will occur. #### 3. Commit to store service - After data ingested into graph storage, you need to commit data loading. The data will not be able to read until committed successfully. If you have `load_data.sh`, then run: - - ``` - $ ./load_data.sh -c commit -d hdfs://1.2.3.4:9000/tmp/data_output - ``` - Or you can run with `java`: + After data ingested into graph storage, you need to commit data loading. The data will not be able to read until committed successfully. Run: ``` - $ java -cp data-load-tool-0.0.1-SNAPSHOT.jar com.alibaba.graphscope.groot.dataload.LoadTool -c commit -d hdfs://1.2.3.4:9000/tmp/data_output + $ ./load_data.sh commit ``` - **Notice: The later committed data will overwrite the earlier committed data which have same vertex types or edge relations.** - + **Note: The later committed data will overwrite the earlier committed data which have same vertex types or edge relations.** diff --git a/interactive_engine/data-load-tool/pom.xml b/interactive_engine/data-load-tool/pom.xml index c7c548646a94..4d87678be92e 100644 --- a/interactive_engine/data-load-tool/pom.xml +++ b/interactive_engine/data-load-tool/pom.xml @@ -13,6 +13,10 @@ data-load-tool + + com.alibaba.graphscope + interactive-common + com.alibaba.graphscope sdk @@ -33,6 +37,7 @@ odps-sdk-mapred provided + com.fasterxml.jackson.core jackson-core @@ -45,6 +50,10 @@ org.slf4j slf4j-api + + ch.qos.logback + logback-classic + commons-codec commons-codec @@ -153,6 +162,18 @@ com.google.protobuf shaded.com.google.protobuf + + org.apache.http + shaded.org.apache.http + + + com.fasterxml + shaded.com.fasterxml + + + io.grpc + shaded.io.grpc + diff --git a/interactive_engine/data-load-tool/src/bin/load_tool.sh b/interactive_engine/data-load-tool/src/bin/load_tool.sh old mode 100644 new mode 100755 index acea1e5d0f3e..00101ef251d8 --- a/interactive_engine/data-load-tool/src/bin/load_tool.sh +++ b/interactive_engine/data-load-tool/src/bin/load_tool.sh @@ -2,8 +2,7 @@ # # load_data tools -set -e -set -o pipefail +set -eo pipefail SCRIPT="$0" while [ -h "$SCRIPT" ] ; do @@ -20,22 +19,45 @@ done LOAD_TOOL_HOME=$(dirname "$SCRIPT") LOAD_TOOL_HOME=$(cd "$LOAD_TOOL_HOME"; pwd) LOAD_TOOL_HOME=$(dirname "$LOAD_TOOL_HOME") -JAR_FILE="$(echo "$LOAD_TOOL_HOME"/lib/*.jar | tr ' ' ':')" -if [ "$1" = "hadoop-build" ]; then - hadoop_build_config=$2 - if [ -z "$hadoop_build_config" ]; then - echo "no valid hadoop build config file" + +usage() { +cat < +EOF +} + +COMMAND=$1 +CONFIG=$2 + +if [ "$COMMAND" = "-h" ] || [ "$COMMAND" = "--help" ]; then + usage + exit 0 +fi + +check_arguments() { + if [ -z "$CONFIG" ]; then + echo "No valid config file" + usage exit 1 fi - exec hadoop jar $JAR_FILE com.alibaba.graphscope.groot.dataload.databuild.OfflineBuild $hadoop_build_config -else - if [ ! -z "$JAVA_HOME" ]; then - JAVA="$JAVA_HOME/bin/java" - fi - if [ ! -x "$JAVA" ]; then - echo "no valid JAVA_HOME" >&2 - exit 1 + + JAR_FILE=$(find "$LOAD_TOOL_HOME"/.. -maxdepth 2 -type f -iname "data-load-tool-*.jar" | head -1) + + if [[ -z "${JAR_FILE}" ]]; then + echo "Error: Could not find data-load-tool-*.jar within the $LOAD_TOOL_HOME" + exit 1 fi - exec "$JAVA" -cp $JAR_FILE com.alibaba.graphscope.groot.dataload.LoadTool "$@" +} + +if [ "$COMMAND" = "build" ]; then + check_arguments + exec hadoop jar "$JAR_FILE" com.alibaba.graphscope.groot.dataload.databuild.OfflineBuild "$CONFIG" +elif [ "$COMMAND" = "ingest" ] || [ "$COMMAND" = "commit" ]; then + check_arguments + exec java -cp "$JAR_FILE" com.alibaba.graphscope.groot.dataload.LoadTool -c "$COMMAND" -f "$CONFIG" +else + usage fi diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/CommitDataCommand.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/CommitDataCommand.java index a3e0c7cbe56c..da77ee5a93a7 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/CommitDataCommand.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/CommitDataCommand.java @@ -11,10 +11,8 @@ import java.util.Map; public class CommitDataCommand extends DataCommand { - - public CommitDataCommand(String dataPath, boolean isFromOSS, String uniquePath) - throws IOException { - super(dataPath, isFromOSS, uniquePath); + public CommitDataCommand(String dataPath) throws IOException { + super(dataPath); } public void run() { @@ -40,6 +38,9 @@ public void run() { } tableToTarget.put(tableId, builder.build()); } - client.commitDataLoad(tableToTarget, this.uniquePath); + System.out.println("Commit data. unique path: " + uniquePath); + client.commitDataLoad(tableToTarget, uniquePath); + System.out.println("Commit complete."); + client.close(); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/DataCommand.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/DataCommand.java index 53c25746bacf..63e2b5eb617c 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/DataCommand.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/DataCommand.java @@ -1,8 +1,9 @@ package com.alibaba.graphscope.groot.dataload; import com.alibaba.graphscope.compiler.api.schema.GraphSchema; +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; import com.alibaba.graphscope.groot.dataload.databuild.ColumnMappingInfo; -import com.alibaba.graphscope.groot.dataload.util.HttpClient; +import com.alibaba.graphscope.groot.dataload.util.OSSFS; import com.alibaba.graphscope.sdkcommon.schema.GraphSchemaMapper; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -15,116 +16,71 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.HttpURLConnection; import java.nio.file.Paths; import java.util.*; public abstract class DataCommand { - protected String configPath; + private final String configPath; + protected String dataRootPath; protected String graphEndpoint; protected GraphSchema schema; protected Map columnMappingInfos; protected String metaData; protected String username; protected String password; - - protected String ossAccessID; - protected String ossAccessKey; protected String uniquePath; - protected final String metaFileName = "META"; - protected final String OSS_ENDPOINT = "oss.endpoint"; - protected final String OSS_ACCESS_ID = "oss.access.id"; - protected final String OSS_ACCESS_KEY = "oss.access.key"; - protected final String OSS_BUCKET_NAME = "oss.bucket.name"; - protected final String OSS_OBJECT_NAME = "oss.object.name"; - protected final String OSS_INFO_URL = "oss.info.url"; - protected final String USER_NAME = "auth.username"; - protected final String PASS_WORD = "auth.password"; + protected Map ingestConfig; - public DataCommand(String configPath, boolean isFromOSS, String uniquePath) throws IOException { + public DataCommand(String configPath) throws IOException { this.configPath = configPath; - this.uniquePath = uniquePath; - initialize(isFromOSS); + initialize(); } - private HashMap getOSSInfoFromURL(String URL) throws IOException { - HttpClient client = new HttpClient(); - HttpURLConnection conn = null; - try { - conn = client.createConnection(URL); - conn.setConnectTimeout(5000); - conn.setReadTimeout(5000); - ObjectMapper mapper = new ObjectMapper(); - TypeReference> typeRef = - new TypeReference>() {}; - return mapper.readValue(conn.getInputStream(), typeRef); - } finally { - if (conn != null) { - conn.disconnect(); - } + private void initialize() throws IOException { + Properties properties = new Properties(); + try (InputStream is = new FileInputStream(this.configPath)) { + properties.load(is); } - } + username = properties.getProperty(DataLoadConfig.USER_NAME); + password = properties.getProperty(DataLoadConfig.PASS_WORD); + graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); + String outputPath = properties.getProperty(DataLoadConfig.OUTPUT_PATH); + String metaFilePath = new Path(outputPath, DataLoadConfig.META_FILE_NAME).toString(); + String dataSinkType = properties.getProperty(DataLoadConfig.DATA_SINK_TYPE, "HDFS"); + + if (dataSinkType.equalsIgnoreCase("HDFS")) { + FileSystem fs = new Path(this.configPath).getFileSystem(new Configuration()); - private void initialize(boolean isFromOSS) throws IOException { - if (isFromOSS) { - Properties properties = new Properties(); - try (InputStream is = new FileInputStream(this.configPath)) { - properties.load(is); - } catch (IOException e) { - throw e; + try (FSDataInputStream inputStream = fs.open(new Path(metaFilePath))) { + this.metaData = inputStream.readUTF(); } - this.ossAccessID = properties.getProperty(OSS_ACCESS_ID); - this.ossAccessKey = properties.getProperty(OSS_ACCESS_KEY); - String ossEndpoint = properties.getProperty(OSS_ENDPOINT); - String ossBucketName = properties.getProperty(OSS_BUCKET_NAME); - String ossObjectName = properties.getProperty(OSS_OBJECT_NAME); - if (this.ossAccessID == null || this.ossAccessID.isEmpty()) { - String URL = properties.getProperty(OSS_INFO_URL); - HashMap o = getOSSInfoFromURL(URL); - this.ossAccessID = o.get("ossAccessID"); - this.ossAccessKey = o.get("ossAccessKey"); - ossEndpoint = o.get("ossEndpoint"); - ossBucketName = o.get("ossBucketName"); - ossObjectName = o.get("ossObjectName"); + dataRootPath = outputPath; + } else if (dataSinkType.equalsIgnoreCase("VOLUME")) { + throw new IOException( + "Volume only supports load.after.build mode, which is running build, ingest and" + + " commit at the same driver."); + } else if (dataSinkType.equalsIgnoreCase("OSS")) { + try (OSSFS fs = new OSSFS(properties)) { + dataRootPath = fs.getQualifiedPath(); + this.metaData = fs.readToString(metaFilePath); + ingestConfig = fs.getConfig(); } - - username = properties.getProperty(USER_NAME); - password = properties.getProperty(PASS_WORD); - - configPath = - "oss://" - + Paths.get( - Paths.get(ossEndpoint, ossBucketName).toString(), - ossObjectName); - - Map ossInfo = new HashMap<>(); - ossInfo.put(OSS_ENDPOINT, ossEndpoint); - ossInfo.put(OSS_ACCESS_ID, ossAccessID); - ossInfo.put(OSS_ACCESS_KEY, ossAccessKey); - OSSFileObj ossFileObj = new OSSFileObj(ossInfo); - ossObjectName = Paths.get(ossObjectName, uniquePath).toString(); - - this.metaData = ossFileObj.readBuffer(ossBucketName, ossObjectName, metaFileName); - ossFileObj.close(); } else { - FileSystem fs = new Path(this.configPath).getFileSystem(new Configuration()); - try (FSDataInputStream inputStream = fs.open(new Path(this.configPath, "META"))) { - this.metaData = inputStream.readUTF(); - } + throw new IOException("Unsupported data sink: " + dataSinkType); } ObjectMapper objectMapper = new ObjectMapper(); Map metaMap = objectMapper.readValue(metaData, new TypeReference>() {}); - this.graphEndpoint = metaMap.get("endpoint"); this.schema = GraphSchemaMapper.parseFromJson(metaMap.get("schema")).toGraphSchema(); this.columnMappingInfos = objectMapper.readValue( metaMap.get("mappings"), new TypeReference>() {}); - this.uniquePath = metaMap.get("unique_path"); + this.uniquePath = metaMap.get(DataLoadConfig.UNIQUE_PATH); + dataRootPath = Paths.get(dataRootPath, uniquePath).toString(); } public abstract void run(); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/IngestDataCommand.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/IngestDataCommand.java index 2ca57c88208d..dbd65328f762 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/IngestDataCommand.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/IngestDataCommand.java @@ -2,18 +2,11 @@ import com.alibaba.graphscope.groot.sdk.GrootClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; -import java.util.HashMap; public class IngestDataCommand extends DataCommand { - private static final Logger logger = LoggerFactory.getLogger(IngestDataCommand.class); - - public IngestDataCommand(String dataPath, boolean isFromOSS, String uniquePath) - throws IOException { - super(dataPath, isFromOSS, uniquePath); + public IngestDataCommand(String dataPath) throws IOException { + super(dataPath); } public void run() { @@ -23,15 +16,10 @@ public void run() { .setUsername(username) .setPassword(password) .build(); - configPath = configPath + "/" + uniquePath; - if (ossAccessID == null || ossAccessKey == null) { - logger.warn("ossAccessID or ossAccessKey is null, using default configuration."); - client.ingestData(configPath); - } else { - HashMap config = new HashMap<>(); - config.put("ossAccessID", ossAccessID); - config.put("ossAccessKey", ossAccessKey); - client.ingestData(configPath, config); - } + System.out.println("Ingesting data with config:"); + ingestConfig.forEach((key, value) -> System.out.println(key + "=" + value)); + System.out.println("Data root path: " + dataRootPath); + client.ingestData(dataRootPath, ingestConfig); + System.out.println("Ingest complete."); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/LoadTool.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/LoadTool.java index a536e9d6fc55..9b9b44aa670b 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/LoadTool.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/LoadTool.java @@ -12,14 +12,12 @@ public class LoadTool { - public static void ingest(String configPath, boolean isFromOSS, String uniquePath) - throws IOException { - new IngestDataCommand(configPath, isFromOSS, uniquePath).run(); + public static void ingest(String configPath) throws IOException { + new IngestDataCommand(configPath).run(); } - public static void commit(String configPath, boolean isFromOSS, String uniquePath) - throws IOException { - new CommitDataCommand(configPath, isFromOSS, uniquePath).run(); + public static void commit(String configPath) throws IOException { + new CommitDataCommand(configPath).run(); } public static void main(String[] args) throws ParseException, IOException { @@ -32,50 +30,27 @@ public static void main(String[] args) throws ParseException, IOException { .desc("supported COMMAND: ingest / commit") .build()); options.addOption( - Option.builder("d") - .longOpt("dir") + Option.builder("f") + .longOpt("config") .hasArg() - .argName("HDFS_PATH") - .desc("data directory of HDFS. e.g., hdfs://1.2.3.4:9000/build_output") - .build()); - options.addOption( - Option.builder("oss") - .longOpt("ossconfigfile") - .hasArg() - .argName("OSS_CONFIG_FILE") - .desc("OSS Config File. e.g., config.init") - .build()); - options.addOption( - Option.builder("u") - .longOpt("uniquepath") - .hasArg() - .argName("UNIQUE_PATH") - .desc("unique path from uuid. e.g., unique_path") + .argName("CONFIG") + .desc("path to configuration file") .build()); options.addOption(Option.builder("h").longOpt("help").desc("print this message").build()); CommandLineParser parser = new DefaultParser(); CommandLine commandLine = parser.parse(options, args); String command = commandLine.getOptionValue("command"); - String configPath = null; - String uniquePath = null; - boolean isFromOSS = false; - if (commandLine.hasOption("oss")) { - isFromOSS = true; - configPath = commandLine.getOptionValue("oss"); - uniquePath = commandLine.getOptionValue("u"); - } else { - configPath = commandLine.getOptionValue("dir"); - } + String configPath = commandLine.getOptionValue("config"); if (commandLine.hasOption("help") || command == null) { printHelp(options); } else if (command.equalsIgnoreCase("ingest")) { - ingest(configPath, isFromOSS, uniquePath); + ingest(configPath); } else if (command.equalsIgnoreCase("commit")) { - commit(configPath, isFromOSS, uniquePath); + commit(configPath); } else if (command.equalsIgnoreCase("ingestAndCommit")) { - ingest(configPath, isFromOSS, uniquePath); - commit(configPath, isFromOSS, uniquePath); + ingest(configPath); + commit(configPath); } else { printHelp(options); } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/OSSFileObj.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/OSSFileObj.java deleted file mode 100644 index 48aa95afe80d..000000000000 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/OSSFileObj.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.alibaba.graphscope.groot.dataload; - -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSS; -import com.aliyun.oss.OSSClientBuilder; -import com.aliyun.oss.OSSException; -import com.aliyun.oss.model.CreateDirectoryRequest; -import com.aliyun.oss.model.OSSObject; -import com.aliyun.oss.model.PutObjectRequest; -import com.aliyun.oss.model.UploadFileRequest; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.file.Paths; -import java.util.Map; - -public class OSSFileObj { - protected final String OSS_ENDPOINT = "oss.endpoint"; - protected final String OSS_ACCESS_ID = "oss.access.id"; - protected final String OSS_ACCESS_KEY = "oss.access.key"; - - protected String ossEndpoint = null; - protected String ossAccessID = null; - protected String ossAccessKey = null; - - protected OSS ossClient = null; - - public OSSFileObj(Map ossInfo) throws IOException { - this.ossEndpoint = ossInfo.get(OSS_ENDPOINT); - this.ossAccessID = ossInfo.get(OSS_ACCESS_ID); - this.ossAccessKey = ossInfo.get(OSS_ACCESS_KEY); - - if (!ossEndpoint.startsWith("http")) { - ossEndpoint = "https://" + ossEndpoint; - } - - try { - this.ossClient = new OSSClientBuilder().build(ossEndpoint, ossAccessID, ossAccessKey); - } catch (OSSException oe) { - throw new IOException(oe); - } catch (ClientException ce) { - throw new IOException(ce); - } - } - - public void uploadFile(String ossBucketName, String ossObjectName, String fileName) - throws IOException { - try { - PutObjectRequest putObjectRequest = - new PutObjectRequest( - ossBucketName, - Paths.get(ossObjectName, fileName).toString(), - new File(fileName)); - ossClient.putObject(putObjectRequest); - } catch (OSSException oe) { - throw new IOException(oe); - } catch (ClientException ce) { - throw new IOException(ce); - } - } - - public void uploadFileWithCheckPoint( - String ossBucketName, String ossObjectName, String fileName) throws IOException { - try { - UploadFileRequest uploadFileRequest = - new UploadFileRequest( - ossBucketName, Paths.get(ossObjectName, fileName).toString()); - uploadFileRequest.setUploadFile(fileName); - uploadFileRequest.setPartSize(10 * 1024 * 1024); - uploadFileRequest.setEnableCheckpoint(true); - ossClient.uploadFile(uploadFileRequest); - } catch (OSSException oe) { - throw new IOException(oe); - } catch (ClientException ce) { - throw new IOException(ce); - } catch (Throwable ce) { - throw new IOException(ce); - } - } - - public void createDirectory(String ossBucketName, String ossObjectName, String dirName) - throws IOException { - try { - String directoryName = Paths.get(ossObjectName, dirName).toString(); - CreateDirectoryRequest createDirectoryRequest = - new CreateDirectoryRequest(ossBucketName, directoryName); - ossClient.createDirectory(createDirectoryRequest); - } catch (OSSException oe) { - throw new IOException(oe); - } catch (ClientException ce) { - throw new IOException(ce); - } - } - - public String readBuffer(String ossBucketName, String ossObjectName, String fileName) - throws IOException { - String data = ""; - try { - OSSObject ossObject = - ossClient.getObject( - ossBucketName, Paths.get(ossObjectName, fileName).toString()); - - BufferedReader reader = - new BufferedReader(new InputStreamReader(ossObject.getObjectContent())); - while (true) { - String line = reader.readLine(); - if (line == null) break; - data += line; - } - reader.close(); - ossObject.close(); - } catch (OSSException oe) { - throw new IOException(oe); - } catch (ClientException ce) { - throw new IOException(ce); - } - return data; - } - - public void close() throws IOException { - if (ossClient != null) { - ossClient.shutdown(); - } - } -} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/BytesRef.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/BytesRef.java index 6e5e6a4833bf..751f3cb45097 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/BytesRef.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/BytesRef.java @@ -16,9 +16,9 @@ package com.alibaba.graphscope.groot.dataload.databuild; public class BytesRef { - private byte[] array; - private int offset; - private int length; + private final byte[] array; + private final int offset; + private final int length; public BytesRef(byte[] array, int offset, int length) { this.array = array; diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Codec.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Codec.java index 73659cf55ff6..5a00d6833d20 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Codec.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/Codec.java @@ -30,11 +30,11 @@ public class Codec { private static final Logger logger = LoggerFactory.getLogger(Codec.class); - private int version; - private List propertyDefs; - private List offsets; - private byte[] nullBytesHolder; - private int fixedPropertiesCount; + private final int version; + private final List propertyDefs; + private final List offsets; + private final byte[] nullBytesHolder; + private final int fixedPropertiesCount; public Codec(GraphElement graphElement) { this.version = graphElement.getVersionId(); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/ColumnMappingConfig.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/ColumnMappingConfig.java deleted file mode 100644 index 2962cbb638ca..000000000000 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/ColumnMappingConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.graphscope.groot.dataload.databuild; - -public class ColumnMappingConfig { - private String fileName; - private FileColumnMapping fileColumnMapping; - - public String getFileName() { - return fileName; - } - - public FileColumnMapping getFileColumnMapping() { - return fileColumnMapping; - } -} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java index 94d3b22fa69d..b31b760d1093 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapper.java @@ -15,6 +15,7 @@ import com.alibaba.graphscope.compiler.api.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.compiler.api.schema.*; +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; import com.alibaba.graphscope.sdkcommon.schema.GraphSchemaMapper; import com.alibaba.graphscope.sdkcommon.schema.PropertyValue; import com.fasterxml.jackson.core.type.TypeReference; @@ -46,26 +47,25 @@ public class DataBuildMapper extends Mapper fileToColumnMappingInfo; - private ObjectMapper objectMapper; - private BytesWritable outKey = new BytesWritable(); - private BytesWritable outVal = new BytesWritable(); - private boolean ldbcCustomize; + private final BytesWritable outKey = new BytesWritable(); + private final BytesWritable outVal = new BytesWritable(); + private boolean ldbcCustomize; // Do some customize data type transformations for LDBC data private boolean skipHeader; @Override protected void setup(Context context) throws IOException { - this.objectMapper = new ObjectMapper(); Configuration conf = context.getConfiguration(); - this.separator = conf.get(OfflineBuild.SEPARATOR); - String schemaJson = conf.get(OfflineBuild.SCHEMA_JSON); + this.separator = conf.get(DataLoadConfig.SEPARATOR); + String schemaJson = conf.get(DataLoadConfig.SCHEMA_JSON); this.graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema(); this.dataEncoder = new DataEncoder(this.graphSchema); - String columnMappingsJson = conf.get(OfflineBuild.COLUMN_MAPPINGS); + String columnMappingsJson = conf.get(DataLoadConfig.COLUMN_MAPPINGS); + ObjectMapper objectMapper = new ObjectMapper(); this.fileToColumnMappingInfo = - this.objectMapper.readValue( + objectMapper.readValue( columnMappingsJson, new TypeReference>() {}); - this.ldbcCustomize = conf.getBoolean(OfflineBuild.LDBC_CUSTOMIZE, false); - this.skipHeader = conf.getBoolean(OfflineBuild.SKIP_HEADER, true); + this.ldbcCustomize = conf.getBoolean(DataLoadConfig.LDBC_CUSTOMIZE, false); + this.skipHeader = conf.getBoolean(DataLoadConfig.SKIP_HEADER, true); DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00")); } @@ -79,7 +79,7 @@ protected void map(LongWritable key, Text value, Context context) String fileName = fullPath.substring(fullPath.lastIndexOf('/') + 1); ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(fileName); if (columnMappingInfo == null) { - logger.warn("Mapper: ignore [" + fileName + "]"); + logger.warn("Mapper: ignore [{}], fullPath is [{}]", fileName, fullPath); return; } @@ -164,7 +164,7 @@ private Map buildPropertiesMap( + "] -> [" + propertyId + "], data [" - + items + + Arrays.toString(items) + "]"); } DataType dataType = propertyDef.getDataType(); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java index e598244de7e9..de3c2a72630a 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildMapperOdps.java @@ -17,6 +17,7 @@ import com.alibaba.graphscope.compiler.api.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.compiler.api.schema.*; +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; import com.alibaba.graphscope.sdkcommon.schema.GraphSchemaMapper; import com.alibaba.graphscope.sdkcommon.schema.PropertyValue; import com.aliyun.odps.data.Record; @@ -33,8 +34,6 @@ public class DataBuildMapperOdps extends MapperBase { private static final Logger logger = LoggerFactory.getLogger(DataBuildMapperOdps.class); - public static final SimpleDateFormat SRC_FMT = - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); public static final SimpleDateFormat DST_FMT = new SimpleDateFormat("yyyyMMddHHmmssSSS"); public static String charSet = "ISO8859-1"; @@ -42,22 +41,24 @@ public class DataBuildMapperOdps extends MapperBase { private DataEncoder dataEncoder; private Map fileToColumnMappingInfo; - private ObjectMapper objectMapper; private Record outKey; private Record outVal; @Override public void setup(TaskContext context) throws IOException { - this.outKey = context.createMapOutputKeyRecord(); - this.outVal = context.createMapOutputValueRecord(); + outKey = context.createMapOutputKeyRecord(); + outVal = context.createMapOutputValueRecord(); - this.objectMapper = new ObjectMapper(); - String schemaJson = context.getJobConf().get(OfflineBuildOdps.SCHEMA_JSON); - this.graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema(); - this.dataEncoder = new DataEncoder(this.graphSchema); - String columnMappingsJson = context.getJobConf().get(OfflineBuildOdps.COLUMN_MAPPINGS); - this.fileToColumnMappingInfo = - this.objectMapper.readValue( + String metaData = context.getJobConf().get(DataLoadConfig.META_INFO); + ObjectMapper objectMapper = new ObjectMapper(); + Map metaMap = + objectMapper.readValue(metaData, new TypeReference>() {}); + String schemaJson = metaMap.get(DataLoadConfig.SCHEMA_JSON); + graphSchema = GraphSchemaMapper.parseFromJson(schemaJson).toGraphSchema(); + dataEncoder = new DataEncoder(graphSchema); + String columnMappingsJson = metaMap.get(DataLoadConfig.COLUMN_MAPPINGS); + fileToColumnMappingInfo = + objectMapper.readValue( columnMappingsJson, new TypeReference>() {}); DST_FMT.setTimeZone(TimeZone.getTimeZone("GMT+00:00")); } @@ -67,7 +68,10 @@ public void map(long recordNum, Record record, TaskContext context) throws IOExc String tableName = context.getInputTableInfo().getTableName(); ColumnMappingInfo columnMappingInfo = this.fileToColumnMappingInfo.get(tableName); if (columnMappingInfo == null) { - logger.warn("Mapper: ignore [{}]", tableName); + logger.warn( + "Mapper: ignore [{}], table info is [{}]", + tableName, + context.getInputTableInfo()); return; } @@ -157,7 +161,7 @@ private Map buildPropertiesMap( + "] -> [" + propertyId + "], data [" - + items + + Arrays.toString(items) + "]"); } DataType dataType = propertyDef.getDataType(); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitioner.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitioner.java index 788b4660c1fd..fa6ce0e2b7f6 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitioner.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitioner.java @@ -30,14 +30,8 @@ public class DataBuildPartitioner extends Partitioner 24) { - // Edge - partitionKey = keyBuf.getLong(8); - } else { - // Vertex - partitionKey = keyBuf.getLong(8); - } + // for edge, its key.getLength() > 24 + long partitionKey = keyBuf.getLong(8); return PartitionUtils.getPartitionIdFromKey(partitionKey, numPartitions); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitionerOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitionerOdps.java index 983190253290..1316b01f78db 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitionerOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildPartitionerOdps.java @@ -31,21 +31,14 @@ public class DataBuildPartitionerOdps extends Partitioner { @Override public int getPartition(Record key, Record value, int numPartitions) { - byte[] keyBytes = null; + byte[] keyBytes; try { keyBytes = ((String) key.get(0)).getBytes(DataBuildMapperOdps.charSet); } catch (UnsupportedEncodingException e) { - throw new RuntimeException("Should not have happened " + e.toString()); + throw new RuntimeException("PartitionerOdps: Should not have happened " + e); } ByteBuffer keyBuf = ByteBuffer.wrap(keyBytes); - long partitionKey; - if (keyBytes.length > 24) { - // Edge - partitionKey = keyBuf.getLong(8); - } else { - // Vertex - partitionKey = keyBuf.getLong(8); - } + long partitionKey = keyBuf.getLong(8); return PartitionUtils.getPartitionIdFromKey(partitionKey, numPartitions); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java index 8ff92e683b98..099cea3a3b79 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataBuildReducerOdps.java @@ -15,66 +15,51 @@ */ package com.alibaba.graphscope.groot.dataload.databuild; -import com.alibaba.graphscope.groot.dataload.OSSFileObj; +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.alibaba.graphscope.groot.dataload.util.AbstractFileSystem; +import com.alibaba.graphscope.groot.dataload.util.FSFactory; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.ReducerBase; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.codec.binary.Hex; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.nio.file.Paths; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; public class DataBuildReducerOdps extends ReducerBase { - private String ossAccessID = null; - private String ossAccessKey = null; - private String ossEndpoint = null; - private String ossBucketName = null; - private String ossObjectName = null; - private SstRecordWriter sstRecordWriter = null; + private String uniquePath = null; private String taskId = null; private String metaData = null; - private OSSFileObj ossFileObj = null; + private AbstractFileSystem fs = null; private String sstFileName = null; private String chkFileName = null; - private String metaFileName = "META"; - private boolean sstFileEmpty; + private String metaFileName = DataLoadConfig.META_FILE_NAME; @Override public void setup(TaskContext context) throws IOException { - this.ossAccessID = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_ID); - this.ossAccessKey = context.getJobConf().get(OfflineBuildOdps.OSS_ACCESS_KEY); - this.ossEndpoint = context.getJobConf().get(OfflineBuildOdps.OSS_ENDPOINT); - this.ossBucketName = context.getJobConf().get(OfflineBuildOdps.OSS_BUCKET_NAME); - this.ossObjectName = context.getJobConf().get(OfflineBuildOdps.OSS_OBJECT_NAME); - this.metaData = context.getJobConf().get(OfflineBuildOdps.META_INFO); - this.uniquePath = context.getJobConf().get(OfflineBuildOdps.UNIQUE_PATH); - - if (!ossEndpoint.startsWith("http")) { - ossEndpoint = "https://" + ossEndpoint; - } + + metaData = context.getJobConf().get(DataLoadConfig.META_INFO); + ObjectMapper objectMapper = new ObjectMapper(); + Map metaMap = + objectMapper.readValue(metaData, new TypeReference>() {}); + + this.uniquePath = metaMap.get(DataLoadConfig.UNIQUE_PATH); this.taskId = context.getTaskID().toString(); taskId = taskId.substring(taskId.length() - 5); sstFileName = "part-r-" + taskId + ".sst"; chkFileName = "part-r-" + taskId + ".chk"; - Map ossInfo = new HashMap(); - ossInfo.put(OfflineBuildOdps.OSS_ENDPOINT, ossEndpoint); - ossInfo.put(OfflineBuildOdps.OSS_ACCESS_ID, ossAccessID); - ossInfo.put(OfflineBuildOdps.OSS_ACCESS_KEY, ossAccessKey); - - this.ossFileObj = new OSSFileObj(ossInfo); - + this.fs = FSFactory.Create(context.getJobConf()); + this.fs.open(context, "w"); /* if ("00000".equals(taskId)) { try { @@ -85,8 +70,6 @@ public void setup(TaskContext context) throws IOException { } */ - ossObjectName = Paths.get(ossObjectName, uniquePath).toString(); - try { this.sstRecordWriter = new SstRecordWriter(sstFileName, DataBuildMapperOdps.charSet); } catch (IOException e) { @@ -99,46 +82,39 @@ public void reduce(Record key, Iterator values, TaskContext context) throws IOException { while (values.hasNext()) { Record value = values.next(); - try { - sstRecordWriter.write((String) key.get(0), (String) value.get(0)); - } catch (IOException e) { - throw e; - } + sstRecordWriter.write((String) key.get(0), (String) value.get(0)); } + context.progress(); } @Override public void cleanup(TaskContext context) throws IOException { - this.sstFileEmpty = sstRecordWriter.empty(); + boolean sstFileEmpty = sstRecordWriter.empty(); try { sstRecordWriter.close(); } catch (IOException e) { e.printStackTrace(); } - String chkData = null; - if (sstFileEmpty) { - chkData = "0"; - } else { - chkData = "1," + getFileMD5(sstFileName); - } + String chkData = sstFileEmpty ? "0" : "1," + getFileMD5(sstFileName); writeFile(chkFileName, chkData); - ossFileObj.uploadFile(ossBucketName, ossObjectName, chkFileName); + + fs.copy(chkFileName, Paths.get(uniquePath, chkFileName).toString()); if (!sstFileEmpty) { - ossFileObj.uploadFileWithCheckPoint(ossBucketName, ossObjectName, sstFileName); + fs.copy(sstFileName, Paths.get(uniquePath, sstFileName).toString()); } - // Only the first task will write the meta + // Only the first task will write the meta if ("00000".equals(taskId)) { try { writeFile(metaFileName, metaData); - ossFileObj.uploadFile(ossBucketName, ossObjectName, metaFileName); + fs.copy(metaFileName, Paths.get(uniquePath, metaFileName).toString()); } catch (IOException e) { throw e; } } - ossFileObj.close(); + fs.close(); } public String getFileMD5(String fileName) throws IOException { @@ -152,17 +128,11 @@ public String getFileMD5(String fileName) throws IOException { MD5.update(buffer, 0, length); } return new String(Hex.encodeHex(MD5.digest())); - } catch (NoSuchAlgorithmException e) { + } catch (NoSuchAlgorithmException | IOException e) { throw new IOException(e); - } catch (IOException e) { - throw e; } finally { - try { - if (fis != null) { - fis.close(); - } - } catch (IOException e) { - throw e; + if (fis != null) { + fis.close(); } } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java index f78b43346b4c..71fcdf84146f 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/DataEncoder.java @@ -32,10 +32,10 @@ public class DataEncoder { private static final long SNAPSHOT_ID = ~0L; - private ByteBuffer scratch = ByteBuffer.allocate(1 << 20); - private Map> labelPkIds = new HashMap<>(); + private final ByteBuffer scratch = ByteBuffer.allocate(1 << 20); + private final Map> labelPkIds = new HashMap<>(); - private Map labelToCodec; + private final Map labelToCodec; public DataEncoder(GraphSchema graphSchema) { this.labelToCodec = buildCodecs(graphSchema); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java index 71175b90e26a..fc7c396bc8b0 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuild.java @@ -16,6 +16,7 @@ import com.alibaba.graphscope.compiler.api.schema.GraphEdge; import com.alibaba.graphscope.compiler.api.schema.GraphElement; import com.alibaba.graphscope.compiler.api.schema.GraphSchema; +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.sdkcommon.common.DataLoadTarget; import com.alibaba.graphscope.sdkcommon.schema.GraphSchemaMapper; @@ -44,19 +45,6 @@ public class OfflineBuild { private static final Logger logger = LoggerFactory.getLogger(OfflineBuild.class); - public static final String INPUT_PATH = "input.path"; - public static final String OUTPUT_PATH = "output.path"; - public static final String GRAPH_ENDPOINT = "graph.endpoint"; - public static final String SCHEMA_JSON = "schema.json"; - public static final String SEPARATOR = "separator"; - public static final String COLUMN_MAPPING_CONFIG = "column.mapping.config"; - public static final String SPLIT_SIZE = "split.size"; - - public static final String COLUMN_MAPPINGS = "column.mappings"; - public static final String LDBC_CUSTOMIZE = "ldbc.customize"; - public static final String LOAD_AFTER_BUILD = "load.after.build"; - public static final String SKIP_HEADER = "skip.header"; - public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String propertiesFile = args[0]; @@ -64,11 +52,24 @@ public static void main(String[] args) try (InputStream is = new FileInputStream(propertiesFile)) { properties.load(is); } - String inputPath = properties.getProperty(INPUT_PATH); - String outputPath = properties.getProperty(OUTPUT_PATH); - String columnMappingConfigStr = properties.getProperty(COLUMN_MAPPING_CONFIG); - String graphEndpoint = properties.getProperty(GRAPH_ENDPOINT); - GrootClient client = GrootClient.newBuilder().setHosts(graphEndpoint).build(); + String inputPath = properties.getProperty(DataLoadConfig.INPUT_PATH); + String outputPath = properties.getProperty(DataLoadConfig.OUTPUT_PATH); + String columnMappingConfigStr = + properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); + + String uniquePath = + properties.getProperty(DataLoadConfig.UNIQUE_PATH, UuidUtils.getBase64UUIDString()); + + String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); + String password = properties.getProperty(DataLoadConfig.PASS_WORD, ""); + + GrootClient client = + GrootClient.newBuilder() + .setHosts(graphEndpoint) + .setUsername(username) + .setPassword(password) + .build(); ObjectMapper objectMapper = new ObjectMapper(); Map columnMappingConfig = objectMapper.readValue( @@ -93,22 +94,30 @@ public static void main(String[] args) (fileName, fileColumnMapping) -> { columnMappingInfos.put(fileName, fileColumnMapping.toColumnMappingInfo(schema)); }); - String ldbcCustomize = properties.getProperty(LDBC_CUSTOMIZE, "true"); - long splitSize = Long.valueOf(properties.getProperty(SPLIT_SIZE, "256")) * 1024 * 1024; + String ldbcCustomize = properties.getProperty(DataLoadConfig.LDBC_CUSTOMIZE, "true"); + long splitSize = + Long.parseLong(properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256")) + * 1024 + * 1024; boolean loadAfterBuild = - properties.getProperty(LOAD_AFTER_BUILD, "false").equalsIgnoreCase("true"); - boolean skipHeader = properties.getProperty(SKIP_HEADER, "true").equalsIgnoreCase("true"); + properties + .getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false") + .equalsIgnoreCase("true"); + boolean skipHeader = + properties.getProperty(DataLoadConfig.SKIP_HEADER, "true").equalsIgnoreCase("true"); + String separator = properties.getProperty(DataLoadConfig.SEPARATOR, "\\|"); + Configuration conf = new Configuration(); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERNODE, splitSize); conf.setLong(CombineTextInputFormat.SPLIT_MINSIZE_PERRACK, splitSize); - conf.setStrings(SCHEMA_JSON, schemaJson); + conf.setStrings(DataLoadConfig.SCHEMA_JSON, schemaJson); String mappings = objectMapper.writeValueAsString(columnMappingInfos); - conf.setStrings(COLUMN_MAPPINGS, mappings); - conf.setBoolean(LDBC_CUSTOMIZE, ldbcCustomize.equalsIgnoreCase("true")); - conf.set(SEPARATOR, properties.getProperty(SEPARATOR, "\\|")); - conf.setBoolean(SKIP_HEADER, skipHeader); + conf.setStrings(DataLoadConfig.COLUMN_MAPPINGS, mappings); + conf.setBoolean(DataLoadConfig.LDBC_CUSTOMIZE, ldbcCustomize.equalsIgnoreCase("true")); + conf.set(DataLoadConfig.SEPARATOR, separator); + conf.setBoolean(DataLoadConfig.SKIP_HEADER, skipHeader); Job job = Job.getInstance(conf, "build graph data"); job.setJarByClass(OfflineBuild.class); job.setMapperClass(DataBuildMapper.class); @@ -122,28 +131,28 @@ public static void main(String[] args) LazyOutputFormat.setOutputFormatClass(job, SstOutputFormat.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileInputFormat.setInputDirRecursive(job, true); - String uniquePath = UuidUtils.getBase64UUIDString(); + Path outputDir = new Path(outputPath, uniquePath); FileOutputFormat.setOutputPath(job, outputDir); if (!job.waitForCompletion(true)) { System.exit(1); } - FileSystem fs = outputDir.getFileSystem(job.getConfiguration()); - String dataPath = fs.makeQualified(outputDir).toString(); Map outputMeta = new HashMap<>(); - outputMeta.put("endpoint", graphEndpoint); - outputMeta.put("schema", schemaJson); - outputMeta.put("mappings", mappings); - outputMeta.put("datapath", dataPath); - outputMeta.put("unique_path", uniquePath); + outputMeta.put(DataLoadConfig.GRAPH_ENDPOINT, graphEndpoint); + outputMeta.put(DataLoadConfig.SCHEMA_JSON, schemaJson); + outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mappings); + outputMeta.put(DataLoadConfig.UNIQUE_PATH, uniquePath); + FileSystem fs = outputDir.getFileSystem(job.getConfiguration()); FSDataOutputStream os = fs.create(new Path(outputDir, "META")); os.writeUTF(objectMapper.writeValueAsString(outputMeta)); os.flush(); os.close(); if (loadAfterBuild) { + String dataPath = fs.makeQualified(outputDir).toString(); + logger.info("start ingesting data"); client.ingestData(dataPath); @@ -166,5 +175,6 @@ public static void main(String[] args) } client.commitDataLoad(tableToTarget, uniquePath); } + client.close(); } } diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java index 1f73964cdb84..b7c0014b145a 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/OfflineBuildOdps.java @@ -17,14 +17,18 @@ import com.alibaba.graphscope.compiler.api.schema.GraphEdge; import com.alibaba.graphscope.compiler.api.schema.GraphElement; import com.alibaba.graphscope.compiler.api.schema.GraphSchema; -import com.alibaba.graphscope.groot.dataload.util.HttpClient; +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.alibaba.graphscope.groot.dataload.util.OSSFS; +import com.alibaba.graphscope.groot.dataload.util.VolumeFS; import com.alibaba.graphscope.groot.sdk.GrootClient; import com.alibaba.graphscope.sdkcommon.common.DataLoadTarget; import com.alibaba.graphscope.sdkcommon.schema.GraphSchemaMapper; import com.alibaba.graphscope.sdkcommon.util.UuidUtils; +import com.aliyun.odps.Odps; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.conf.JobConf; +import com.aliyun.odps.mapred.conf.SessionState; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; @@ -37,96 +41,31 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.net.HttpURLConnection; -import java.nio.file.Paths; import java.util.*; public class OfflineBuildOdps { private static final Logger logger = LoggerFactory.getLogger(OfflineBuildOdps.class); - public static final String GRAPH_ENDPOINT = "graph.endpoint"; - public static final String SCHEMA_JSON = "schema.json"; - public static final String SEPARATOR = "separator"; - public static final String COLUMN_MAPPING_CONFIG = "column.mapping.config"; - public static final String SPLIT_SIZE = "split.size"; - - public static final String COLUMN_MAPPINGS = "column.mappings"; - public static final String SKIP_HEADER = "skip.header"; - - public static final String OUTPUT_TABLE = "output.table"; - public static final String OSS_ACCESS_ID = "oss.access.id"; - public static final String OSS_ACCESS_KEY = "oss.access.key"; - public static final String OSS_ENDPOINT = "oss.endpoint"; - public static final String OSS_BUCKET_NAME = "oss.bucket.name"; - public static final String OSS_OBJECT_NAME = "oss.object.name"; - public static final String OSS_INFO_URL = "oss.info.url"; - - public static final String META_INFO = "meta.info"; - public static final String USER_NAME = "auth.username"; - public static final String PASS_WORD = "auth.password"; - public static final String UNIQUE_PATH = "unique.path"; - - private static HashMap getOSSInfoFromURL(String URL) throws IOException { - HttpClient client = new HttpClient(); - HttpURLConnection conn = null; - try { - conn = client.createConnection(URL); - conn.setConnectTimeout(5000); - conn.setReadTimeout(5000); - ObjectMapper mapper = new ObjectMapper(); - TypeReference> typeRef = - new TypeReference>() {}; - return mapper.readValue(conn.getInputStream(), typeRef); - } finally { - if (conn != null) { - conn.disconnect(); - } - } - } + private static Odps odps; public static void main(String[] args) throws IOException { String propertiesFile = args[0]; - String uniquePath = UuidUtils.getBase64UUIDString(); - // User can assign a unique path manually. - if (args.length > 1) { - uniquePath = args[1]; - } Properties properties = new Properties(); try (InputStream is = new FileInputStream(propertiesFile)) { properties.load(is); } + odps = SessionState.get().getOdps(); - String outputTable = properties.getProperty(OUTPUT_TABLE); - String ossAccessID = properties.getProperty(OSS_ACCESS_ID); - String ossAccessKey = properties.getProperty(OSS_ACCESS_KEY); - String ossEndpoint = properties.getProperty(OSS_ENDPOINT); - String ossBucketName = properties.getProperty(OSS_BUCKET_NAME); - String ossObjectName = properties.getProperty(OSS_OBJECT_NAME); + String columnMappingConfigStr = + properties.getProperty(DataLoadConfig.COLUMN_MAPPING_CONFIG); + String graphEndpoint = properties.getProperty(DataLoadConfig.GRAPH_ENDPOINT); + String username = properties.getProperty(DataLoadConfig.USER_NAME, ""); + String password = properties.getProperty(DataLoadConfig.PASS_WORD, ""); - if (ossAccessID == null || ossAccessID.isEmpty()) { - String URL = properties.getProperty(OSS_INFO_URL); - HashMap o = getOSSInfoFromURL(URL); - ossAccessID = o.get("ossAccessID"); - ossAccessKey = o.get("ossAccessKey"); - ossEndpoint = o.get("ossEndpoint"); - ossBucketName = o.get("ossBucketName"); - ossObjectName = o.get("ossObjectName"); - } - - // The table format is `project.table` or `table`; - // For partitioned table, the format is `project.table|p1=1/p2=2` or `table|p1=1/p2=2` - String columnMappingConfigStr = properties.getProperty(COLUMN_MAPPING_CONFIG); - String graphEndpoint = properties.getProperty(GRAPH_ENDPOINT); - String username = properties.getProperty(USER_NAME); - String password = properties.getProperty(PASS_WORD); + String uniquePath = + properties.getProperty(DataLoadConfig.UNIQUE_PATH, UuidUtils.getBase64UUIDString()); - GrootClient client = - GrootClient.newBuilder() - .setHosts(graphEndpoint) - .setUsername(username) - .setPassword(password) - .build(); ObjectMapper objectMapper = new ObjectMapper(); Map columnMappingConfig = objectMapper.readValue( @@ -142,8 +81,17 @@ public static void main(String[] args) throws IOException { .setDstLabel(fileColumnMapping.getDstLabel()) .build()); } + + GrootClient client = + GrootClient.newBuilder() + .setHosts(graphEndpoint) + .setUsername(username) + .setPassword(password) + .build(); + GraphSchema schema = client.prepareDataLoad(targets); String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString(); + // number of reduce task int partitionNum = client.getPartitionNum(); Map tableType = new HashMap<>(); @@ -157,31 +105,21 @@ public static void main(String[] args) throws IOException { columnMappingInfos.put(getTableName(fileName), columnMappingInfo); tableType.put(fileName, schema.getElement(columnMappingInfo.getLabelId())); }); - long splitSize = Long.valueOf(properties.getProperty(SPLIT_SIZE, "256")); - boolean skipHeader = properties.getProperty(SKIP_HEADER, "true").equalsIgnoreCase("true"); + long splitSize = Long.parseLong(properties.getProperty(DataLoadConfig.SPLIT_SIZE, "256")); JobConf job = new JobConf(); - job.set(SCHEMA_JSON, schemaJson); String mappings = objectMapper.writeValueAsString(columnMappingInfos); - job.set(COLUMN_MAPPINGS, mappings); - job.set(SEPARATOR, properties.getProperty(SEPARATOR, "\\|")); - job.setBoolean(SKIP_HEADER, skipHeader); - job.set(GRAPH_ENDPOINT, graphEndpoint); - - job.set(OSS_ACCESS_ID, ossAccessID); - job.set(OSS_ACCESS_KEY, ossAccessKey); - job.set(OSS_ENDPOINT, ossEndpoint); - job.set(OSS_BUCKET_NAME, ossBucketName); - job.set(OSS_OBJECT_NAME, ossObjectName); // Avoid java sandbox protection job.set("odps.isolation.session.enable", "true"); // Don't introduce legacy jar files job.set("odps.sql.udf.java.retain.legacy", "false"); // Default priority is 9 - job.set("odps.instance.priority", "1"); + job.setInstancePriority(0); job.set("odps.mr.run.mode", "sql"); job.set("odps.mr.sql.group.enable", "true"); + job.setFunctionTimeout(2400); + job.setMemoryForReducerJVM(2048); for (Map.Entry entry : tableType.entrySet()) { if (entry.getValue() instanceof GraphVertex || entry.getValue() instanceof GraphEdge) { @@ -198,53 +136,105 @@ public static void main(String[] args) throws IOException { job.setMapOutputKeySchema(SchemaUtils.fromString("key:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("value:string")); - OutputUtils.addTable(parseTableURL(outputTable), job); - - String dataPath = Paths.get(ossBucketName, ossObjectName).toString(); + String dataSinkType = properties.getProperty(DataLoadConfig.DATA_SINK_TYPE, "VOLUME"); + Map config; + String fullQualifiedDataPath; + if (dataSinkType.equalsIgnoreCase("VOLUME")) { + try (VolumeFS fs = new VolumeFS(properties)) { + fs.setJobConf(job); + config = fs.setConfig(odps); + fullQualifiedDataPath = fs.getQualifiedPath(); + fs.createVolumeIfNotExists(odps); + OutputUtils.addVolume(fs.getVolumeInfo(), job); + } + } else if (dataSinkType.equalsIgnoreCase("OSS")) { + try (OSSFS fs = new OSSFS(properties)) { + fs.setJobConf(job); + config = fs.getConfig(); + fullQualifiedDataPath = fs.getQualifiedPath(); + } + String outputTable = properties.getProperty(DataLoadConfig.OUTPUT_TABLE); + OutputUtils.addTable(parseTableURL(outputTable), job); + } else if (dataSinkType.equalsIgnoreCase("HDFS")) { + throw new IOException("HDFS as a data sink is not supported in ODPS"); + } else { + throw new IOException("Unsupported data sink: " + dataSinkType); + } Map outputMeta = new HashMap<>(); - outputMeta.put("endpoint", graphEndpoint); - outputMeta.put("schema", schemaJson); - outputMeta.put("mappings", mappings); - outputMeta.put("datapath", dataPath); - outputMeta.put("unique_path", uniquePath); - - job.set(META_INFO, objectMapper.writeValueAsString(outputMeta)); - job.set(UNIQUE_PATH, uniquePath); - - System.out.println("uniquePath is: " + uniquePath); - + outputMeta.put(DataLoadConfig.GRAPH_ENDPOINT, graphEndpoint); + outputMeta.put(DataLoadConfig.SCHEMA_JSON, schemaJson); + outputMeta.put(DataLoadConfig.COLUMN_MAPPINGS, mappings); + outputMeta.put(DataLoadConfig.UNIQUE_PATH, uniquePath); + outputMeta.put(DataLoadConfig.DATA_SINK_TYPE, dataSinkType); + + job.set(DataLoadConfig.META_INFO, objectMapper.writeValueAsString(outputMeta)); + job.set(DataLoadConfig.DATA_SINK_TYPE, dataSinkType); try { JobClient.runJob(job); } catch (Exception e) { throw new IOException(e); } + + boolean loadAfterBuild = + properties + .getProperty(DataLoadConfig.LOAD_AFTER_BUILD, "false") + .equalsIgnoreCase("true"); + if (loadAfterBuild) { + fullQualifiedDataPath = fullQualifiedDataPath + uniquePath; + logger.info("start ingesting data from " + fullQualifiedDataPath); + logger.info("Ingesting data with config:"); + config.forEach((key, value) -> logger.info(key + "=" + value)); + try { + client.ingestData(fullQualifiedDataPath, config); + logger.info("start committing bulk load"); + Map tableToTarget = new HashMap<>(); + for (ColumnMappingInfo columnMappingInfo : columnMappingInfos.values()) { + long tableId = columnMappingInfo.getTableId(); + int labelId = columnMappingInfo.getLabelId(); + GraphElement graphElement = schema.getElement(labelId); + String label = graphElement.getLabel(); + DataLoadTarget.Builder builder = DataLoadTarget.newBuilder(); + builder.setLabel(label); + if (graphElement instanceof GraphEdge) { + builder.setSrcLabel( + schema.getElement(columnMappingInfo.getSrcLabelId()).getLabel()); + builder.setDstLabel( + schema.getElement(columnMappingInfo.getDstLabelId()).getLabel()); + } + tableToTarget.put(tableId, builder.build()); + } + client.commitDataLoad(tableToTarget, uniquePath); + + } catch (Exception ex) { + logger.error("Failed to ingest/commit data", ex); + client.clearIngest(uniquePath); + client.close(); + throw ex; + } + } + client.close(); } private static String getTableName(String tableFullName) { - String tableName; - if (tableFullName.contains(".")) { - String[] items = tableFullName.split("\\."); - tableName = items[1]; - } else { - tableName = tableFullName; - } - if (tableName.contains("|")) { - String[] items = tableName.split("\\|"); - tableName = items[0]; - } - return tableName; + TableInfo info = parseTableURL(tableFullName); + return info.getTableName(); } - private static TableInfo parseTableURL(String tableFullName) { - String projectName = null; - String tableName = null; + /** + * Parse table URL to @TableInfo + * @param url the pattern of [projectName.]tableName[|partitionSpec] + * @return TableInfo + */ + private static TableInfo parseTableURL(String url) { + String projectName = odps.getDefaultProject(); + String tableName; String partitionSpec = null; - if (tableFullName.contains(".")) { - String[] items = tableFullName.split("\\."); + if (url.contains(".")) { + String[] items = url.split("\\."); projectName = items[0]; tableName = items[1]; } else { - tableName = tableFullName; + tableName = url; } if (tableName.contains("|")) { String[] items = tableName.split("\\|"); @@ -253,9 +243,8 @@ private static TableInfo parseTableURL(String tableFullName) { } TableInfo.TableInfoBuilder builder = TableInfo.builder(); - if (projectName != null) { - builder.projectName(projectName); - } + builder.projectName(projectName); + builder.tableName(tableName); if (partitionSpec != null) { builder.partSpec(partitionSpec); diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java index 2a06e08d117f..3821a75940d4 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstOutputFormat.java @@ -37,10 +37,10 @@ public class SstOutputFormat extends FileOutputFormat { - private SstFileWriter sstFileWriter; - private FileSystem fs; - private String fileName; - private Path path; + private final SstFileWriter sstFileWriter; + private final FileSystem fs; + private final String fileName; + private final Path path; public SstRecordWriter(FileSystem fs, Path path) throws RocksDBException { this.fs = fs; diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java index 1250b65f9914..226f83f5177a 100644 --- a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/databuild/SstRecordWriter.java @@ -18,11 +18,10 @@ import org.rocksdb.*; import java.io.IOException; -import java.util.*; public class SstRecordWriter { - private SstFileWriter sstFileWriter; - private String charSet; + private final SstFileWriter sstFileWriter; + private final String charSet; private boolean isEmpty; public SstRecordWriter(String fileName, String charSet) throws IOException { @@ -30,8 +29,8 @@ public SstRecordWriter(String fileName, String charSet) throws IOException { this.charSet = charSet; Options options = new Options(); options.setCreateIfMissing(true) - .setWriteBufferSize(512 << 20) - .setMaxWriteBufferNumber(8) + .setWriteBufferSize(64 << 20) + .setMaxWriteBufferNumber(4) .setTargetFileSizeBase(512 << 20); this.sstFileWriter = new SstFileWriter(new EnvOptions(), options); try { @@ -42,12 +41,12 @@ public SstRecordWriter(String fileName, String charSet) throws IOException { } public void write(String key, String value) throws IOException { - this.isEmpty = false; try { sstFileWriter.put(key.getBytes(charSet), value.getBytes(charSet)); } catch (RocksDBException e) { throw new IOException(e); } + this.isEmpty = false; } public boolean empty() { diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/AbstractFileSystem.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/AbstractFileSystem.java new file mode 100644 index 000000000000..7cd13b90780d --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/AbstractFileSystem.java @@ -0,0 +1,22 @@ +package com.alibaba.graphscope.groot.dataload.util; + +import com.aliyun.odps.mapred.TaskContext; +import com.aliyun.odps.mapred.conf.JobConf; + +import java.io.IOException; + +public abstract class AbstractFileSystem implements AutoCloseable { + + protected AbstractFileSystem() {} + + public abstract void setJobConf(JobConf jobConf); + + public abstract void open(TaskContext context, String mode) throws IOException; + + public abstract String readToString(String fileName) throws IOException; + + public abstract void copy(String srcFile, String dstFile) throws IOException; + + @Override + public void close() {} +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/FSFactory.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/FSFactory.java new file mode 100644 index 000000000000..fb360e0e7e1c --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/FSFactory.java @@ -0,0 +1,23 @@ +package com.alibaba.graphscope.groot.dataload.util; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.aliyun.odps.mapred.conf.JobConf; + +import java.io.IOException; + +public class FSFactory { + public FSFactory() {} + + public static AbstractFileSystem Create(JobConf conf) throws IOException { + String dataSinkType = conf.get(DataLoadConfig.DATA_SINK_TYPE, "VOLUME"); + if (dataSinkType.equalsIgnoreCase("VOLUME")) { + return new VolumeFS(conf); + } else if (dataSinkType.equalsIgnoreCase("OSS")) { + return new OSSFS(conf); + } else if (dataSinkType.equalsIgnoreCase("HDFS")) { + throw new IOException("HDFS as a data sink is not supported in ODPS"); + } else { + throw new IOException("Unsupported data sink: " + dataSinkType); + } + } +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/OSSFS.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/OSSFS.java new file mode 100644 index 000000000000..f58b77b5eb2d --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/OSSFS.java @@ -0,0 +1,182 @@ +package com.alibaba.graphscope.groot.dataload.util; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.aliyun.odps.mapred.TaskContext; +import com.aliyun.odps.mapred.conf.JobConf; +import com.aliyun.oss.*; +import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.UploadFileRequest; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class OSSFS extends AbstractFileSystem { + private static final Logger logger = LoggerFactory.getLogger(OSSFS.class); + + private String ossAccessID; + private String ossAccessKey; + private String endpoint; + private final String bucket; + private final String object; + + private OSS ossClient = null; + + private void initClient() throws IOException { + ClientBuilderConfiguration conf = new ClientBuilderConfiguration(); + conf.setMaxErrorRetry(10); + try { + this.ossClient = + new OSSClientBuilder().build(endpoint, ossAccessID, ossAccessKey, conf); + } catch (OSSException | ClientException oe) { + throw new IOException(oe); + } + } + + public OSSFS(JobConf jobConf) throws IOException { + this.ossAccessID = jobConf.get(DataLoadConfig.OSS_ACCESS_ID); + this.ossAccessKey = jobConf.get(DataLoadConfig.OSS_ACCESS_KEY); + this.endpoint = jobConf.get(DataLoadConfig.OSS_ENDPOINT); + this.bucket = jobConf.get(DataLoadConfig.OSS_BUCKET_NAME); + this.object = jobConf.get(DataLoadConfig.OSS_OBJECT_NAME); + + if (!endpoint.startsWith("https")) { + endpoint = "https://" + endpoint; + } + initClient(); + } + + public OSSFS(Properties properties) throws IOException { + this.ossAccessID = properties.getProperty(DataLoadConfig.OSS_ACCESS_ID); + this.ossAccessKey = properties.getProperty(DataLoadConfig.OSS_ACCESS_KEY); + if (this.ossAccessID == null || this.ossAccessID.isEmpty()) { + String URL = properties.getProperty(DataLoadConfig.OSS_INFO_URL); + HashMap o = getOSSInfoFromURL(URL); + this.ossAccessID = o.get("ossAccessID"); + this.ossAccessKey = o.get("ossAccessKey"); + this.endpoint = o.get("ossEndpoint"); + this.bucket = o.get("ossBucketName"); + this.object = o.get("ossObjectName"); + } else { + this.endpoint = properties.getProperty(DataLoadConfig.OSS_ENDPOINT); + this.bucket = properties.getProperty(DataLoadConfig.OSS_BUCKET_NAME); + this.object = properties.getProperty(DataLoadConfig.OSS_OBJECT_NAME); + } + initClient(); + } + + @Override + public void open(TaskContext context, String mode) throws IOException {} + + public String getQualifiedPath() { + return "oss://"; + } + + public void setJobConf(JobConf jobConf) { + jobConf.set(DataLoadConfig.OSS_ACCESS_ID, ossAccessID); + jobConf.set(DataLoadConfig.OSS_ACCESS_KEY, ossAccessKey); + jobConf.set(DataLoadConfig.OSS_ENDPOINT, endpoint); + jobConf.set(DataLoadConfig.OSS_BUCKET_NAME, bucket); + jobConf.set(DataLoadConfig.OSS_OBJECT_NAME, object); + } + + private HashMap getOSSInfoFromURL(String URL) throws IOException { + HttpClient client = new HttpClient(); + HttpURLConnection conn = null; + try { + conn = client.createConnection(URL); + conn.setConnectTimeout(5000); + conn.setReadTimeout(5000); + ObjectMapper mapper = new ObjectMapper(); + TypeReference> typeRef = + new TypeReference>() {}; + return mapper.readValue(conn.getInputStream(), typeRef); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + } + + public void copy(String srcFile, String dstFile) throws IOException { + uploadFileWithCheckPoint(srcFile, dstFile); + } + + public void putObject(String prefix, String fileName) throws IOException { + String key = Paths.get(object, prefix, fileName).toString(); + try { + ossClient.putObject(bucket, key, new File(fileName)); + } catch (OSSException | ClientException oe) { + logger.error("Error message: {}", oe.getMessage()); + throw new IOException(oe); + } + } + + public void uploadFileWithCheckPoint(String srcFile, String dstFile) throws IOException { + String key = Paths.get(object, dstFile).toString(); + UploadFileRequest uploadFileRequest = new UploadFileRequest(bucket, key); + uploadFileRequest.setUploadFile(srcFile); + uploadFileRequest.setPartSize(10 * 1024 * 1024); + uploadFileRequest.setEnableCheckpoint(true); + try { + ossClient.uploadFile(uploadFileRequest); + } catch (OSSException oe) { + logger.error("Error message: {}", oe.getMessage()); + throw new IOException(oe); + } catch (Throwable ce) { + logger.error("Error Message:" + ce.getMessage()); + throw new IOException(ce); + } + } + + public void createDirectory(String dirName) throws IOException {} + + public Map getConfig() { + HashMap config = new HashMap<>(); + config.put(DataLoadConfig.OSS_ACCESS_ID, ossAccessID); + config.put(DataLoadConfig.OSS_ACCESS_KEY, ossAccessKey); + config.put(DataLoadConfig.OSS_ENDPOINT, endpoint); + config.put(DataLoadConfig.OSS_BUCKET_NAME, bucket); + config.put(DataLoadConfig.OSS_OBJECT_NAME, object); + return config; + } + + public String readToString(String fileName) throws IOException { + StringBuilder data = new StringBuilder(); + String objName = Paths.get(object, fileName).toString(); + try { + OSSObject ossObject = ossClient.getObject(bucket, objName); + + BufferedReader reader = + new BufferedReader(new InputStreamReader(ossObject.getObjectContent())); + while (true) { + String line = reader.readLine(); + if (line == null) break; + data.append(line); + } + reader.close(); + ossObject.close(); + } catch (OSSException | ClientException oe) { + logger.error("Error Message:" + oe.getMessage()); + throw new IOException(oe); + } + return data.toString(); + } + + public void close() { + if (ossClient != null) { + ossClient.shutdown(); + } + } +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java new file mode 100644 index 000000000000..8bfbc47b5776 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/VolumeFS.java @@ -0,0 +1,182 @@ +package com.alibaba.graphscope.groot.dataload.util; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.Volume; +import com.aliyun.odps.account.Account; +import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.data.VolumeInfo; +import com.aliyun.odps.mapred.TaskContext; +import com.aliyun.odps.mapred.conf.JobConf; +import com.aliyun.odps.tunnel.VolumeTunnel; +import com.aliyun.odps.volume.FileSystem; +import com.aliyun.odps.volume.Path; + +import java.io.*; +import java.util.*; + +public class VolumeFS extends AbstractFileSystem { + private String projectName; + private String volumeName; + private String partSpec; + FileSystem fs; + + public VolumeFS(Properties properties) { + projectName = properties.getProperty(DataLoadConfig.ODPS_VOLUME_PROJECT); // Could be null + volumeName = properties.getProperty(DataLoadConfig.ODPS_VOLUME_NAME); + partSpec = properties.getProperty(DataLoadConfig.ODPS_VOLUME_PARTSPEC); + } + + public VolumeFS(JobConf jobConf) throws IOException { + projectName = jobConf.get(DataLoadConfig.ODPS_VOLUME_PROJECT); + volumeName = jobConf.get(DataLoadConfig.ODPS_VOLUME_NAME); + partSpec = jobConf.get(DataLoadConfig.ODPS_VOLUME_PARTSPEC); + } + + public void setJobConf(JobConf jobConf) { + jobConf.set(DataLoadConfig.ODPS_VOLUME_PROJECT, projectName); + jobConf.set(DataLoadConfig.ODPS_VOLUME_NAME, volumeName); + jobConf.set(DataLoadConfig.ODPS_VOLUME_PARTSPEC, partSpec); + } + + public void open(TaskContext context, String mode) throws IOException { + if (mode.equalsIgnoreCase("w")) { + fs = context.getOutputVolumeFileSystem(); + } else { + fs = context.getInputVolumeFileSystem(); + } + } + + public VolumeInfo getVolumeInfo() { + return new VolumeInfo(projectName, volumeName, partSpec, "__default__"); + } + + public void uploadLocalDirToVolumeCompressed(String path, OutputStream stream) + throws IOException { + ZipUtil.compress(path, stream); + } + + public void copy(String srcFile, String dstFile) throws IOException { + DataOutputStream outputStream = fs.create(new Path(dstFile)); + FileInputStream fileInputStream = new FileInputStream(srcFile); + byte[] buffer = new byte[1024]; + while (true) { + int len = fileInputStream.read(buffer); + if (len == -1) { + break; + } + outputStream.write(buffer, 0, len); + } + outputStream.close(); + System.out.println("Copied " + srcFile + " to " + dstFile); + } + + public void createDirectory(String dirName) throws IOException {} + + public Map setConfig(Odps odps) throws IOException { + Account account = odps.getAccount(); + AliyunAccount aliyunAccount = + account instanceof AliyunAccount ? ((AliyunAccount) account) : null; + if (aliyunAccount == null) { + throw new IOException("Not an AliyunAccount"); + } + HashMap config = new HashMap<>(); + config.put(DataLoadConfig.ODPS_ACCESS_ID, aliyunAccount.getAccessId()); + config.put(DataLoadConfig.ODPS_ACCESS_KEY, aliyunAccount.getAccessKey()); + config.put(DataLoadConfig.ODPS_ENDPOINT, odps.getEndpoint()); + config.put(DataLoadConfig.ODPS_VOLUME_PROJECT, projectName); + config.put(DataLoadConfig.ODPS_VOLUME_NAME, volumeName); + config.put(DataLoadConfig.ODPS_VOLUME_PARTSPEC, partSpec); + return config; + } + + public String getQualifiedPath() { + return "volume://"; + } + + public String readToString(String fileName) throws IOException { + DataInputStream dis = fs.open(new Path(fileName)); + String str = dis.readUTF(); + dis.close(); + return str; + } + + public void copyDirectoryRecursively(String path) throws IOException { + List fileSets = globFiles(path); + for (int i = 0; i < fileSets.size(); i++) { + Path dstPath = new Path(fileSets.get(i)); + DataOutputStream outputStream = fs.create(dstPath); + FileInputStream fileInputStream = new FileInputStream(fileSets.get(i)); + byte[] buffer = new byte[1024]; + while (fileInputStream.read(buffer) != -1) { + outputStream.write(buffer); + } + outputStream.close(); + } + } + + private void uploadFileTunnel( + Odps odps, + String projectName, + String volumeName, + String partitionName, + String fileName, + String fileContent) + throws Exception { + VolumeTunnel volumeTunnel = new VolumeTunnel(odps); + + String[] files = new String[] {fileName}; + VolumeTunnel.UploadSession uploadSession = + volumeTunnel.createUploadSession(projectName, volumeName, partitionName); + OutputStream outputStream = uploadSession.openOutputStream(fileName); + outputStream.write(fileContent.getBytes()); + outputStream.close(); + uploadSession.commit(files); + } + + public static List globFiles(String path) { + List fileNameSet = new ArrayList<>(); + File file = new File(path); + if (file.isFile()) { + fileNameSet.add(path); + } else { + for (File value : file.listFiles()) { + if (value.isFile()) { + fileNameSet.add(value.getPath()); + } else if (value.isDirectory()) { + fileNameSet.addAll(globFiles(value.getPath())); + } + } + } + + return fileNameSet; + } + + public void createVolumeIfNotExists(Odps odps) throws IOException { + try { + if (!odps.volumes().exists(projectName, volumeName)) { + odps.volumes() + .create( + projectName, + volumeName, + "created by groot data-load-tools", + Volume.Type.OLD, + 7L); + } + } catch (OdpsException e) { + System.out.println( + "Exception during creating volume [" + + getVolumeInfo() + + "] [" + + e.getRequestId() + + "] [" + + e.getErrorCode() + + "]: " + + e.getMessage()); + throw new IOException(e.getMessage()); + } + } + + public void close() {} +} diff --git a/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/ZipUtil.java b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/ZipUtil.java new file mode 100644 index 000000000000..6ab1de77d8c0 --- /dev/null +++ b/interactive_engine/data-load-tool/src/main/java/com/alibaba/graphscope/groot/dataload/util/ZipUtil.java @@ -0,0 +1,60 @@ +package com.alibaba.graphscope.groot.dataload.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.Files; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +public class ZipUtil { + private static final Logger logger = LoggerFactory.getLogger(ZipUtil.class); + private static final int COMPRESS_LEVEL = 1; + + public static void compress(String directory, OutputStream outputStream) throws IOException { + logger.info("create zip output stream for {}", directory); + ZipOutputStream zipOut = new ZipOutputStream(new BufferedOutputStream(outputStream)); + zipOut.setLevel(COMPRESS_LEVEL); + try { + compressDirectory(directory, zipOut); + zipOut.finish(); + } finally { + zipOut.close(); + } + } + + private static void compressDirectory(String path, final ZipOutputStream zipOutputStream) + throws IOException { + File file = new File(path); + if (file.isFile()) { + compressFile(path, zipOutputStream); + } else if (file.isDirectory()) { + File[] array = file.listFiles(); + // process empty directory + assert array != null; + if (array.length < 1) { + logger.warn("zipping empty directory for {}!!!", path); + ZipEntry ze = new ZipEntry(path + File.separator); + zipOutputStream.putNextEntry(ze); + zipOutputStream.closeEntry(); + } + for (File value : array) { + if (value.isFile()) { + compressFile(path + File.separator + value.getName(), zipOutputStream); + } else if (value.isDirectory()) { + compressDirectory(path + File.separator + value.getName(), zipOutputStream); + } + } + } + } + + private static void compressFile(String path, final ZipOutputStream zipOutputStream) + throws IOException { + ZipEntry ze = new ZipEntry(path); + zipOutputStream.putNextEntry(ze); + File file = new File(path); + Files.copy(file.toPath(), zipOutputStream); + zipOutputStream.closeEntry(); + } +} diff --git a/interactive_engine/data-load-tool/src/main/scala/com/alibaba/graphscope/groot/dataload/LoadToolSpark.scala b/interactive_engine/data-load-tool/src/main/scala/com/alibaba/graphscope/groot/dataload/LoadToolSpark.scala index 8172c48b0840..cbcdfa4aa0f4 100644 --- a/interactive_engine/data-load-tool/src/main/scala/com/alibaba/graphscope/groot/dataload/LoadToolSpark.scala +++ b/interactive_engine/data-load-tool/src/main/scala/com/alibaba/graphscope/groot/dataload/LoadToolSpark.scala @@ -24,19 +24,16 @@ object LoadToolSpark { .appName("LoadToolSpark") .getOrCreate() - val sc = spark.sparkContext - try { val command = args(0) val configPath = args(1) - val uniquePath = args(2) if ("ingest".equalsIgnoreCase(command)) { - LoadTool.ingest(configPath, true, uniquePath); + LoadTool.ingest(configPath) } else if ("commit".equalsIgnoreCase(command)) { - LoadTool.commit(configPath, true, uniquePath); + LoadTool.commit(configPath) } else if ("ingestAndCommit".equalsIgnoreCase(command)) { - LoadTool.ingest(configPath, true, uniquePath); - LoadTool.commit(configPath, true, uniquePath); + LoadTool.ingest(configPath) + LoadTool.commit(configPath) } else { throw new Exception("supported COMMAND: ingest / commit / ingestAndCommit"); } diff --git a/interactive_engine/executor/assembly/groot/src/store/graph.rs b/interactive_engine/executor/assembly/groot/src/store/graph.rs index 7b93038145f9..a9bc993ee992 100644 --- a/interactive_engine/executor/assembly/groot/src/store/graph.rs +++ b/interactive_engine/executor/assembly/groot/src/store/graph.rs @@ -56,7 +56,7 @@ pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHa INIT.call_once(|| { if let Some(config_file) = config.get_storage_option("log4rs.config") { log4rs::init_file(config_file, Default::default()).expect("init log4rs failed"); - info!("log4rs inited, config file: {}", config_file); + info!("log4rs init, config file: {}", config_file); } else { println!("No valid log4rs.config, rust won't print logs"); } diff --git a/interactive_engine/groot-module/pom.xml b/interactive_engine/groot-module/pom.xml index afc7dd73b122..1cf3bead3e29 100644 --- a/interactive_engine/groot-module/pom.xml +++ b/interactive_engine/groot-module/pom.xml @@ -107,6 +107,17 @@ org.slf4j slf4j-api + + + com.aliyun.odps + odps-sdk-commons + compile + + + com.aliyun.odps + odps-sdk-core + compile + diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/SnapshotCache.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/SnapshotCache.java index a4b1a450f27b..3ce365e39f53 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/SnapshotCache.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/SnapshotCache.java @@ -30,9 +30,9 @@ public class SnapshotCache { public static final Logger logger = LoggerFactory.getLogger(SnapshotCache.class); - private AtomicReference snapshotWithSchemaRef; + private final AtomicReference snapshotWithSchemaRef; - private TreeMap> snapshotToListeners; + private final TreeMap> snapshotToListeners; public SnapshotCache() { SnapshotWithSchema snapshotWithSchema = SnapshotWithSchema.newBuilder().build(); @@ -65,7 +65,7 @@ public void addListener(long snapshotId, SnapshotListener listener) { * *

We need to decide whether should the writing framework coupled with the implementation of * schema synchronization. Options are discussed here: - * https://yuque.antfin-inc.com/graphscope/project/eibfty#EQGg9 This interface assumes write + * Schema synchronization This interface assumes write * framework isn't coupled with schema synchronization. * * @param snapshotId @@ -92,6 +92,7 @@ public synchronized long advanceQuerySnapshotId(long snapshotId, GraphDef graphD || graphDef.getSchemaVersion() > oldGraphDef.getVersion())) { newSnapshotInfoBuilder.setGraphDef(graphDef); logger.info("schema updated. schema version [" + graphDef.getVersion() + "]"); + logger.info(graphDef.toProto().toString()); } this.snapshotWithSchemaRef.set(newSnapshotInfoBuilder.build()); logger.debug("snapshotId update to [" + snapshotId + "]"); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java index 39c59dea45bf..2d27443d8390 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientService.java @@ -337,6 +337,7 @@ public void ingestData( AtomicInteger counter = new AtomicInteger(storeCount); AtomicBoolean finished = new AtomicBoolean(false); for (int i = 0; i < storeCount; i++) { + logger.info("Store [" + i + "] started to ingest..."); this.storeIngestor.ingest( i, dataPath, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/ddl/AbstractDropTypeExecutor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/ddl/AbstractDropTypeExecutor.java index 6c613c55ca44..8ec094c5f6a6 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/ddl/AbstractDropTypeExecutor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/schema/ddl/AbstractDropTypeExecutor.java @@ -43,13 +43,13 @@ public DdlResult execute(ByteString ddlBlob, GraphDef graphDef, int partitionCou String dstLabel = kind.getDstVertexLabel(); if (srcLabel.equals(label) || dstLabel.equals(label)) { throw new DdlException( - "cannot drop label [ " + "cannot drop label [" + label - + " ], since it has related edgeKinds [ " + + "], since it has related edgeKinds [" + srcLabel - + " ] -> [ " + + "] -> [" + dstLabel - + " ]"); + + "]"); } } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java index 4ee6215216d1..fb7b0e456c54 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreIngestService.java @@ -60,7 +60,7 @@ public void storeClearIngest( StoreClearIngestRequest request, StreamObserver responseObserver) { try { - this.storeService.clearIngest(); + this.storeService.clearIngest(request.getDataPath()); responseObserver.onNext(StoreClearIngestResponse.newBuilder().build()); responseObserver.onCompleted(); } catch (IOException e) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 2cf5fc8a4a53..a0aa689f389b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -60,17 +60,16 @@ public class StoreService implements MetricsAgent { private static final String PARTITION_WRITE_PER_SECOND_MS = "partition.write.per.second.ms"; - private Configs storeConfigs; - private int storeId; - private int writeThreadCount; - private int downloadThreadCount; - private MetaService metaService; + private final Configs storeConfigs; + private final int storeId; + private final int writeThreadCount; + private final MetaService metaService; private Map idToPartition; private ExecutorService writeExecutor; private ExecutorService ingestExecutor; private ExecutorService garbageCollectExecutor; - private ExecutorService downloadExecutor; - private boolean enableGc; + private ThreadPoolExecutor downloadExecutor; + private final boolean enableGc; private volatile boolean shouldStop = true; private volatile long lastUpdateTime; @@ -128,16 +127,16 @@ public void start() throws IOException { ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-garbage-collect", logger)); logger.info("StoreService started. storeId [" + this.storeId + "]"); - this.downloadThreadCount = 8; this.downloadExecutor = new ThreadPoolExecutor( - 0, - downloadThreadCount, - 0L, + 16, + 16, + 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-download", logger)); + this.downloadExecutor.allowCoreThreadTimeOut(true); logger.info("StoreService started. storeId [" + this.storeId + "]"); } @@ -304,7 +303,7 @@ public void ingestData( String path, Map config, CompletionCallback callback) { String dataRoot = StoreConfig.STORE_DATA_PATH.get(storeConfigs); String downloadPath = Paths.get(dataRoot, "download").toString(); - String[] items = path.split("\\/"); + String[] items = path.split("/"); // Get the unique path (uuid) String unique_path = items[items.length - 1]; Path uniquePath = Paths.get(downloadPath, unique_path); @@ -319,8 +318,8 @@ public void ingestData( } this.ingestExecutor.execute( () -> { + logger.info("ingesting data [{}]", path); try { - logger.info("ingesting data [{}]", path); ingestDataInternal(path, config, callback); } catch (Exception e) { logger.error("ingest data failed. path [" + path + "]", e); @@ -348,29 +347,38 @@ private void ingestDataInternal( try { partition.ingestExternalFile(externalStorage, fullPath); } catch (Exception e) { + logger.error("ingest external file failed.", e); if (!finished.getAndSet(true)) { callback.onError(e); } } if (counter.decrementAndGet() == 0) { + logger.info("All download tasks finished."); finished.set(true); callback.onCompleted(null); + } else { + logger.info(counter.get() + " download tasks remaining"); } }); } } - public void clearIngest() throws IOException { + public void clearIngest(String dataPath) throws IOException { String dataRoot = StoreConfig.STORE_DATA_PATH.get(storeConfigs); - Path downloadPath = Paths.get(dataRoot, "download"); + if (dataPath == null || dataPath.isEmpty()) { + logger.warn("Must set a sub-path for clearing."); + return; + } + + Path downloadPath = Paths.get(dataRoot, "download", dataPath); try { logger.info("Clearing directory {}", downloadPath); FileUtils.forceDelete(downloadPath.toFile()); - } catch (FileNotFoundException fnfe) { + } catch (FileNotFoundException e) { // Ignore } logger.info("cleared directory {}", downloadPath); - Files.createDirectories(downloadPath); + // Files.createDirectories(downloadPath); } public void garbageCollect(long snapshotId, CompletionCallback callback) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java index c0cca9c1966d..938482232fb0 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/ExternalStorage.java @@ -1,10 +1,21 @@ package com.alibaba.graphscope.groot.store.external; +import org.apache.commons.codec.binary.Hex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Map; public abstract class ExternalStorage { + private static final Logger logger = LoggerFactory.getLogger(ExternalStorage.class); + public static ExternalStorage getStorage(String path, Map config) throws IOException { URI uri = URI.create(path); @@ -14,11 +25,67 @@ public static ExternalStorage getStorage(String path, Map config return new HdfsStorage(path); case "oss": return new OssStorage(path, config); + case "volume": + return new VolumeStorage(path, config); default: throw new IllegalArgumentException( "external storage scheme [" + scheme + "] not supported"); } } - public abstract void downloadData(String srcPath, String dstPath) throws IOException; + public abstract void downloadDataSimple(String srcPath, String dstPath) throws IOException; + + public void downloadData(String srcPath, String dstPath) throws IOException { + // Check chk + String chkPath = srcPath.substring(0, srcPath.length() - ".sst".length()) + ".chk"; + String chkLocalPath = dstPath.substring(0, dstPath.length() - ".sst".length()) + ".chk"; + + downloadDataSimple(chkPath, chkLocalPath); + File chkFile = new File(chkLocalPath); + byte[] chkData = new byte[(int) chkFile.length()]; + try { + FileInputStream fis = new FileInputStream(chkFile); + fis.read(chkData); + fis.close(); + } catch (FileNotFoundException e) { + throw new IOException(e); + } + String[] chkArray = new String(chkData).split(","); + if ("0".equals(chkArray[0])) { + return; + } + String chkMD5Value = chkArray[1]; + downloadDataSimple(srcPath, dstPath); + String sstMD5Value = getFileMD5(dstPath); + if (!chkMD5Value.equals(sstMD5Value)) { + logger.error("Checksum failed for " + chkLocalPath + " versus " + dstPath); + logger.error("Expect [" + chkMD5Value + "], got [" + sstMD5Value + "]"); + throw new IOException("CheckSum failed for " + srcPath); + } else { + // The .chk file are now useless + chkFile.delete(); + } + } + + public String getFileMD5(String fileName) throws IOException { + FileInputStream fis = null; + try { + MessageDigest MD5 = MessageDigest.getInstance("MD5"); + fis = new FileInputStream(fileName); + byte[] buffer = new byte[8192]; + int length; + while ((length = fis.read(buffer)) != -1) { + MD5.update(buffer, 0, length); + } + return new String(Hex.encodeHex(MD5.digest())); + } catch (NoSuchAlgorithmException e) { + throw new IOException(e); + } catch (IOException e) { + throw e; + } finally { + if (fis != null) { + fis.close(); + } + } + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/HdfsStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/HdfsStorage.java index d0a97352f528..bc8ec066fce7 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/HdfsStorage.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/HdfsStorage.java @@ -3,10 +3,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; public class HdfsStorage extends ExternalStorage { + private static final Logger logger = LoggerFactory.getLogger(HdfsStorage.class); private FileSystem fs; @@ -17,7 +20,15 @@ public HdfsStorage(String path) throws IOException { } @Override + public void downloadDataSimple(String srcPath, String dstPath) throws IOException { + if (fs.exists(new Path(srcPath))) { + fs.copyToLocalFile(new Path(srcPath), new Path(dstPath)); + } else { + logger.warn("Path doesn't exists: " + srcPath); + } + } + public void downloadData(String srcPath, String dstPath) throws IOException { - fs.copyToLocalFile(new Path(srcPath), new Path(dstPath)); + downloadDataSimple(srcPath, dstPath); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java index c1b3c606f497..6d1b4b2f208d 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/OssStorage.java @@ -1,31 +1,50 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ package com.alibaba.graphscope.groot.store.external; +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; import com.aliyun.oss.model.GetObjectRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; -import java.net.URI; +import java.nio.file.Paths; import java.util.Map; public class OssStorage extends ExternalStorage { - - private OSS ossClient; + private static final Logger logger = LoggerFactory.getLogger(OssStorage.class); + private final OSS ossClient; + private String bucket; + private String rootPath; public OssStorage(String path, Map config) { - URI uri = URI.create(path); - String endpoint = uri.getAuthority(); - String accessID = config.get("ossAccessID"); - String accessKey = config.get("ossAccessKey"); + String endpoint = config.get(DataLoadConfig.OSS_ENDPOINT); + String accessID = config.get(DataLoadConfig.OSS_ACCESS_ID); + String accessKey = config.get(DataLoadConfig.ODPS_ACCESS_KEY); + bucket = config.get(DataLoadConfig.OSS_BUCKET_NAME); + rootPath = config.get(DataLoadConfig.OSS_OBJECT_NAME); this.ossClient = new OSSClientBuilder().build(endpoint, accessID, accessKey); } @Override - public void downloadData(String srcPath, String dstPath) { - URI uri = URI.create(srcPath); - String[] pathItems = uri.getPath().split("/", 3); - String bucketName = pathItems[1]; - String objectName = pathItems[2]; - ossClient.getObject(new GetObjectRequest(bucketName, objectName), new File(dstPath)); + public void downloadDataSimple(String srcPath, String dstPath) { + logger.info("Downloading " + srcPath + " to " + dstPath); + String[] pathItems = srcPath.split("://"); + String objectFullName = Paths.get(rootPath, pathItems[1]).toString(); + ossClient.getObject(new GetObjectRequest(bucket, objectFullName), new File(dstPath)); } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/VolumeStorage.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/VolumeStorage.java new file mode 100644 index 000000000000..4357ae442955 --- /dev/null +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/external/VolumeStorage.java @@ -0,0 +1,65 @@ +package com.alibaba.graphscope.groot.store.external; + +import com.alibaba.graphscope.groot.common.config.DataLoadConfig; +import com.aliyun.odps.Odps; +import com.aliyun.odps.account.AliyunAccount; +import com.aliyun.odps.tunnel.TunnelException; +import com.aliyun.odps.tunnel.VolumeTunnel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; + +public class VolumeStorage extends ExternalStorage { + private static final Logger logger = LoggerFactory.getLogger(VolumeStorage.class); + VolumeTunnel tunnel; + String projectName; + String volumeName; + String partSpec; + + public VolumeStorage(String path, Map config) { + String endpoint = config.get(DataLoadConfig.ODPS_ENDPOINT); + String accessID = config.get(DataLoadConfig.ODPS_ACCESS_ID); + String accessKey = config.get(DataLoadConfig.ODPS_ACCESS_KEY); + AliyunAccount account = new AliyunAccount(accessID, accessKey); + + Odps odps = new Odps(account); + odps.setEndpoint(endpoint); + tunnel = new VolumeTunnel(odps); + + projectName = config.get(DataLoadConfig.ODPS_VOLUME_PROJECT); + volumeName = config.get(DataLoadConfig.ODPS_VOLUME_NAME); + partSpec = config.get(DataLoadConfig.ODPS_VOLUME_PARTSPEC); + } + + @Override + public void downloadDataSimple(String srcPath, String dstPath) throws IOException { + logger.info("Downloading " + srcPath + " to " + dstPath); + long start = System.currentTimeMillis(); + String[] pathItems = srcPath.split("://"); + String fileName = pathItems[1]; + // Read data from the input stream and write it to the output stream. + byte[] buffer = new byte[1024]; + int bytesRead; + VolumeTunnel.DownloadSession session; + try { + session = tunnel.createDownloadSession(projectName, volumeName, partSpec, fileName); + try (InputStream inputStream = session.openInputStream()) { + try (OutputStream outputStream = Files.newOutputStream(Paths.get(dstPath))) { + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + } + } + } catch (TunnelException e) { + throw new IOException(e); + } + long finish = System.currentTimeMillis(); + long timeElapsed = finish - start; + logger.info("Downloaded " + srcPath + " in " + timeElapsed + " ms."); + } +} diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java index c57c67938062..242c2ae757d4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java @@ -22,20 +22,13 @@ import com.alibaba.graphscope.proto.groot.GraphDefPb; import com.sun.jna.Pointer; -import org.apache.commons.codec.binary.Hex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; public class JnaGraphStore implements GraphPartition { private static final Logger logger = LoggerFactory.getLogger(JnaGraphStore.class); @@ -110,74 +103,11 @@ public GraphDefPb getGraphDefBlob() throws IOException { @Override public void ingestExternalFile(ExternalStorage storage, String sstPath) throws IOException { - String[] items = sstPath.split("\\/"); + String[] items = sstPath.split("/"); String unique_path = items[items.length - 2]; String sstName = sstPath.substring(sstPath.lastIndexOf('/') + 1); String sstLocalPath = downloadPath.toString() + "/" + unique_path + "/" + sstName; - URI uri = URI.create(sstPath); - String scheme = uri.getScheme(); - if ("oss".equals(scheme)) { - String chkPath = sstPath.substring(0, sstPath.length() - ".sst".length()) + ".chk"; - String chkName = sstName.substring(0, sstName.length() - ".sst".length()) + ".chk"; - String chkLocalPath = downloadPath.toString() + "/" + unique_path + "/" + chkName; - - storage.downloadData(chkPath, chkLocalPath); - File chkFile = new File(chkLocalPath); - byte[] chkData = new byte[(int) chkFile.length()]; - try { - FileInputStream fis = new FileInputStream(chkFile); - fis.read(chkData); - fis.close(); - } catch (FileNotFoundException e) { - throw new IOException(e); - } catch (IOException e) { - throw e; - } - String[] chkArray = new String(chkData).split(","); - if ("0".equals(chkArray[0])) { - return; - } - String chkMD5Value = chkArray[1]; - storage.downloadData(sstPath, sstLocalPath); - String sstMD5Value = getFileMD5(sstLocalPath); - if (!chkMD5Value.equals(sstMD5Value)) { - throw new IOException("CheckSum failed for " + sstPath); - } else { - // The .chk file are now useless - chkFile.delete(); - } - } else if ("hdfs".equals(scheme)) { - storage.downloadData(sstPath, sstLocalPath); - } else { - throw new IllegalArgumentException( - "external storage scheme [" + scheme + "] not supported"); - } - } - - public String getFileMD5(String fileName) throws IOException { - FileInputStream fis = null; - try { - MessageDigest MD5 = MessageDigest.getInstance("MD5"); - fis = new FileInputStream(fileName); - byte[] buffer = new byte[8192]; - int length; - while ((length = fis.read(buffer)) != -1) { - MD5.update(buffer, 0, length); - } - return new String(Hex.encodeHex(MD5.digest())); - } catch (NoSuchAlgorithmException e) { - throw new IOException(e); - } catch (IOException e) { - throw e; - } finally { - try { - if (fis != null) { - fis.close(); - } - } catch (IOException e) { - throw e; - } - } + storage.downloadData(sstPath, sstLocalPath); } @Override diff --git a/interactive_engine/pom.xml b/interactive_engine/pom.xml index 939bebc42ebb..d7d6980bbcb0 100644 --- a/interactive_engine/pom.xml +++ b/interactive_engine/pom.xml @@ -20,6 +20,9 @@ graphscope + + true + v6d @@ -53,24 +56,12 @@ - default - - true - + groot-data-load - assembly common data-load-tool - executor - frontend - groot-module - groot-server - lgraph sdk sdk-common - tests - executor/engine/pegasus/clients/java/client - compiler @@ -98,6 +89,7 @@ 3.21.12 3.21.12 1.51.1 + 4.1.79.Final 5.7.0 4.13.2 5.6.3 @@ -119,12 +111,15 @@ 2.11.0 1.32.0 2.9.3 - - 4.4.1 4.0.2 4.9.1 - 3.14.1 + + 3.16.3 + + 4.5.13 + 0.36.4-public + 3.3.8-public 2.12.10 2.12 3.1.1 @@ -149,12 +144,6 @@ - - com.alibaba - fastjson - 2.0.20 - - com.alibaba.graphscope interactive-common @@ -299,7 +288,7 @@ io.netty netty-all - 4.1.79.Final + ${netty.version} @@ -330,17 +319,29 @@ ${odps.sdk.public.version} + + com.aliyun.odps + odps-sdk-core + ${odps.sdk.public.version} + + com.aliyun.odps odps-sdk-mapred ${odps.sdk.public.version} + + com.aliyun.odps + cupid-sdk + ${cupid.sdk.version} + + com.fasterxml.jackson jackson-bom - 2.14.1 + ${jackson.version} import pom @@ -356,7 +357,7 @@ io.grpc grpc-bom - 1.51.1 + ${grpc.version} import pom @@ -667,6 +668,20 @@ + + org.apache.maven.plugins + maven-help-plugin + 3.1.0 + + + show-profiles + compile + + active-profiles + + + + diff --git a/interactive_engine/proto/sdk/client.proto b/interactive_engine/proto/sdk/client.proto index 5868c90fe219..805a85a163e3 100644 --- a/interactive_engine/proto/sdk/client.proto +++ b/interactive_engine/proto/sdk/client.proto @@ -153,6 +153,7 @@ message GetLoggerInfoResponse { } message ClearIngestRequest { + string dataPath = 1; } message ClearIngestResponse { diff --git a/interactive_engine/proto/store_ingest_service.proto b/interactive_engine/proto/store_ingest_service.proto index 544cc94f5b0e..e8c27a968320 100644 --- a/interactive_engine/proto/store_ingest_service.proto +++ b/interactive_engine/proto/store_ingest_service.proto @@ -33,7 +33,7 @@ message StoreIngestResponse { } message StoreClearIngestRequest { - + string dataPath = 1; } message StoreClearIngestResponse { diff --git a/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/compiler/schema/DefaultGraphSchema.java b/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/compiler/schema/DefaultGraphSchema.java index 4cfc00832543..a50561d2cb33 100644 --- a/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/compiler/schema/DefaultGraphSchema.java +++ b/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/compiler/schema/DefaultGraphSchema.java @@ -71,7 +71,7 @@ public GraphElement getElement(int labelId) throws GraphElementNotFoundException return getElement(idToLabelList.get(labelId)); } - throw new GraphElementNotFoundException("label not exist for labelid " + labelId); + throw new GraphElementNotFoundException("label not exist for label ID " + labelId); } @Override diff --git a/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/sdkcommon/schema/PropertyValue.java b/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/sdkcommon/schema/PropertyValue.java index 6199f8567bb7..9919efa328c6 100644 --- a/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/sdkcommon/schema/PropertyValue.java +++ b/interactive_engine/sdk-common/src/main/java/com/alibaba/graphscope/sdkcommon/schema/PropertyValue.java @@ -98,6 +98,9 @@ public Object getValue() { if (valObject != null) { return valObject; } + if (dataType == DataType.UNKNOWN) { + return null; + } this.valObject = SerdeUtils.bytesToObject(this.dataType, this.valBytes); return valObject; } diff --git a/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index 4a07b130063d..fbe04071d6c6 100644 --- a/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/sdk/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -178,7 +178,9 @@ public void ingestData(String path) { public void ingestData(String path, Map config) { IngestDataRequest.Builder builder = IngestDataRequest.newBuilder(); builder.setDataPath(path); - builder.putAllConfig(config); + if (config != null) { + builder.putAllConfig(config); + } this.stub.ingestData(builder.build()); } @@ -258,8 +260,8 @@ public void close() { this.gremlinClient.close(); } - public void clearIngest() { - this.stub.clearIngest(ClearIngestRequest.newBuilder().build()); + public void clearIngest(String dataPath) { + this.stub.clearIngest(ClearIngestRequest.newBuilder().setDataPath(dataPath).build()); } public static GrootClientBuilder newBuilder() { diff --git a/k8s/dockerfiles/graphscope-store.Dockerfile b/k8s/dockerfiles/graphscope-store.Dockerfile index a54d52495a19..3697c5c82a52 100644 --- a/k8s/dockerfiles/graphscope-store.Dockerfile +++ b/k8s/dockerfiles/graphscope-store.Dockerfile @@ -16,7 +16,7 @@ USER graphscope RUN cd /home/graphscope/graphscope \ && . ~/.graphscope_env \ && cd /home/graphscope/graphscope/interactive_engine \ - && mvn clean package -P groot,groot-assembly -DskipTests --quiet -Drust.compile.mode="$profile" \ + && mvn clean package -P groot -DskipTests --quiet -Drust.compile.mode="$profile" \ && tar xzf /home/graphscope/graphscope/interactive_engine/assembly/target/groot.tar.gz -C /home/graphscope/ FROM ubuntu:22.04 diff --git a/python/graphscope/framework/graph_schema.py b/python/graphscope/framework/graph_schema.py index 71addc26e8c9..7fa9998d65d7 100644 --- a/python/graphscope/framework/graph_schema.py +++ b/python/graphscope/framework/graph_schema.py @@ -555,12 +555,16 @@ def add_edge_label(self, label, src_label=None, dst_label=None, properties=None) def drop(self, label, src_label=None, dst_label=None): for item in self._vertex_labels: if label == item.label: + if src_label is not None or dst_label is not None: + raise ValueError( + "Vertex label should not have source and destination." + ) self._vertex_labels_to_drop.append(VertexLabel(label)) return for item in self._edge_labels: if label == item.label: label_to_drop = EdgeLabel(label) - if src_label and dst_label: + if src_label is not None and dst_label is not None: label_to_drop.source(src_label).destination(dst_label) self._edge_labels_to_drop.append(label_to_drop) return