Skip to content
Permalink
Browse files
feat: introduce HugeGraphSparkLoader (#282)
  • Loading branch information
simon824 committed May 27, 2022
1 parent f6e6a74 commit 9ee7abb2ee397db636e4c1a1f258f0c7bcd1b5c1
Showing 11 changed files with 343 additions and 9 deletions.
@@ -0,0 +1,64 @@
#!/bin/bash

PARAMS=""
while (( "$#" )); do
case "$1" in
-m|--master)
MASTER=$2
shift 2
;;

-n|--name)
APP_NAME=$2
shift 2
;;

-e|--deploy-mode)
DEPLOY_MODE=$2
shift 2
;;

-c|--conf)
SPARK_CONFIG=${SPARK_CONFIG}" --conf "$2
shift 2
;;

--) # end argument parsing
shift
break
;;

*) # preserve positional arguments
PARAMS="$PARAMS $1"
shift
;;

esac
done

if [ -z ${MASTER} ] || [ -z ${DEPLOY_MODE} ]; then
echo "Error: The following options are required:
[-e | --deploy-mode], [-m | --master]"
usage
exit 0
fi

BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
APP_DIR=$(dirname ${BIN_DIR})
LIB_DIR=${APP_DIR}/lib

assemblyJarName=$(find ${LIB_DIR} -name hugegraph-loader*.jar)

DEFAULT_APP_NAME="hugegraph-spark-loader"
APP_NAME=${APP_NAME:-$DEFAULT_APP_NAME}

CMD="${SPARK_HOME}/bin/spark-submit
--name ${APP_NAME} \
--master ${MASTER} \
--deploy-mode ${DEPLOY_MODE} \
--class com.baidu.hugegraph.loader.spark.HugeGraphSparkLoader \
${SPARK_CONFIG}
--jars $(echo ${LIB_DIR}/*.jar | tr ' ' ',') ${assemblyJarName} ${PARAMS}"

echo ${CMD}
exec ${CMD}
@@ -36,6 +36,17 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>

<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-client</artifactId>
@@ -20,6 +20,7 @@
package com.baidu.hugegraph.loader.executor;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@@ -38,7 +39,7 @@
import com.baidu.hugegraph.structure.constant.GraphMode;
import com.baidu.hugegraph.util.Log;

public final class LoadContext {
public final class LoadContext implements Serializable {

private static final Logger LOG = Log.logger(LoadContext.class);

@@ -20,6 +20,7 @@
package com.baidu.hugegraph.loader.executor;

import java.io.File;
import java.io.Serializable;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
@@ -35,7 +36,7 @@
import com.beust.jcommander.ParameterException;
import com.google.common.collect.ImmutableSet;

public class LoadOptions {
public class LoadOptions implements Serializable {

private static final Logger LOG = Log.logger(LoadOptions.class);

@@ -19,6 +19,7 @@

package com.baidu.hugegraph.loader.mapping;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -37,7 +38,7 @@
import com.google.common.collect.ImmutableSet;

@JsonPropertyOrder({"label", "skip"})
public abstract class ElementMapping implements Checkable {
public abstract class ElementMapping implements Checkable, Serializable {

@JsonProperty("label")
private String label;
@@ -55,6 +56,8 @@ public abstract class ElementMapping implements Checkable {
private Set<Object> nullValues;
@JsonProperty("update_strategies")
private Map<String, UpdateStrategy> updateStrategies;
@JsonProperty("batch_size")
private long batchSize;

public ElementMapping() {
this.skip = false;
@@ -64,6 +67,7 @@ public ElementMapping() {
this.ignoredFields = new HashSet<>();
this.nullValues = ImmutableSet.of(Constants.EMPTY_STR);
this.updateStrategies = new HashMap<>();
this.batchSize = 1000;
}

public abstract ElemType type();
@@ -173,6 +177,10 @@ public Object mappingValue(String fieldName, String rawValue) {
return mappingValue;
}

public long batchSize() {
return this.batchSize;
}

public Set<String> selectedFields() {
return this.selectedFields;
}
@@ -19,6 +19,7 @@

package com.baidu.hugegraph.loader.mapping;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@@ -33,7 +34,7 @@
import com.google.common.collect.ImmutableList;

@JsonPropertyOrder({"id", "skip", "input", "vertices", "edges"})
public class InputStruct implements Checkable {
public class InputStruct implements Checkable, Serializable {

public static final InputStruct EMPTY = new InputStruct(ImmutableList.of(),
ImmutableList.of());
@@ -19,6 +19,7 @@

package com.baidu.hugegraph.loader.source;

import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
@@ -29,7 +30,7 @@
import com.baidu.hugegraph.util.E;
import com.fasterxml.jackson.annotation.JsonProperty;

public abstract class AbstractSource implements InputSource {
public abstract class AbstractSource implements InputSource, Serializable {

@JsonProperty("header")
private String[] header;
@@ -19,14 +19,15 @@

package com.baidu.hugegraph.loader.source.file;

import java.io.Serializable;
import java.util.Set;

import org.apache.commons.io.FilenameUtils;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;

public class FileFilter {
public class FileFilter implements Serializable {

private static final String ALL_EXTENSION = "*";

@@ -19,13 +19,14 @@

package com.baidu.hugegraph.loader.source.file;

import java.io.Serializable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.baidu.hugegraph.loader.constant.Constants;
import com.fasterxml.jackson.annotation.JsonProperty;

public class SkippedLine {
public class SkippedLine implements Serializable {

@JsonProperty("regex")
private String regex;

0 comments on commit 9ee7abb

Please sign in to comment.