Skip to content
Permalink
Browse files
feat: Introduce HugeGraphFlinkCDCLoader (#291)
  • Loading branch information
simon824 committed Jun 21, 2022
1 parent 0bbb1d1 commit 87869a1d94b8e19713a232ec4ced99c201e07763
Showing 12 changed files with 612 additions and 59 deletions.
@@ -0,0 +1,24 @@
#!/bin/bash
function get_params() {
echo "params: $*"
engine_params=""
hugegraph_params=""
while (("$#")); do
case "$1" in
-–file | --graph | --schema | --host | --port | --username | --token | --protocol | \
--trust-store-file | --trust-store-password | --clear-all-data | --clear-timeout | \
--incremental-mode | --failure-mode | --batch-insert-threads | --single-insert-threads | \
--max-conn | --max-conn-per-route | --batch-size | --max-parse-errors | --max-insert-errors | \
--timeout | --shutdown-timeout | --retry-times | --retry-interval | --check-vertex | \
--print-progress | --dry-run | --help)
hugegraph_params="$hugegraph_params $1 $2"
shift 2
;;

*) # preserve positional arguments
engine_params="$engine_params $1"
shift
;;
esac
done
}
@@ -0,0 +1,20 @@
#!/bin/bash

BIN_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
APP_DIR=$(dirname ${BIN_DIR})
LIB_DIR=${APP_DIR}/lib
assemblyJarName=$(find ${LIB_DIR} -name hugegraph-loader*.jar)

# get hugegraph_params and engine_params
source "$BIN_DIR"/get_params.sh
get_params $*
echo "engine_params: $engine_params"
echo "hugegraph_params: $hugegraph_params"

CMD=${FLINK_HOME}/bin/flink run \
${engine_params} \
-c com.baidu.hugegraph.loader.flink.HugeGraphFlinkCDCLoader \
${assemblyJarName} ${hugegraph_params}

echo ${CMD}
exec ${CMD}
@@ -1,64 +1,24 @@
#!/bin/bash

PARAMS=""
while (( "$#" )); do
case "$1" in
-m|--master)
MASTER=$2
shift 2
;;

-n|--name)
APP_NAME=$2
shift 2
;;

-e|--deploy-mode)
DEPLOY_MODE=$2
shift 2
;;

-c|--conf)
SPARK_CONFIG=${SPARK_CONFIG}" --conf "$2
shift 2
;;

--) # end argument parsing
shift
break
;;

*) # preserve positional arguments
PARAMS="$PARAMS $1"
shift
;;

esac
done

if [ -z ${MASTER} ] || [ -z ${DEPLOY_MODE} ]; then
echo "Error: The following options are required:
[-e | --deploy-mode], [-m | --master]"
usage
exit 0
fi

BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BIN_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
APP_DIR=$(dirname ${BIN_DIR})
LIB_DIR=${APP_DIR}/lib

# get hugegraph_params and engine_params
source "$BIN_DIR"/get_params.sh
get_params $*
echo "engine_params: $engine_params"
echo "hugegraph_params: $hugegraph_params"

assemblyJarName=$(find ${LIB_DIR} -name hugegraph-loader*.jar)

DEFAULT_APP_NAME="hugegraph-spark-loader"
APP_NAME=${APP_NAME:-$DEFAULT_APP_NAME}

CMD="${SPARK_HOME}/bin/spark-submit
--name ${APP_NAME} \
--master ${MASTER} \
--deploy-mode ${DEPLOY_MODE} \
--class com.baidu.hugegraph.loader.spark.HugeGraphSparkLoader \
${SPARK_CONFIG}
--jars $(echo ${LIB_DIR}/*.jar | tr ' ' ',') ${assemblyJarName} ${PARAMS}"
${engine_params}
--jars $(echo ${LIB_DIR}/*.jar | tr ' ' ',') ${assemblyJarName} ${hugegraph_params}"

echo ${CMD}
exec ${CMD}
@@ -38,13 +38,55 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>antlr4-runtime</artifactId>
<groupId>org.antlr</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
@@ -176,7 +218,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
<version>8.0.16</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
@@ -76,4 +76,7 @@ public final class Constants {
public static final int VERTEX_ID_LIMIT = 128;
public static final String[] SEARCH_LIST = new String[]{":", "!"};
public static final String[] TARGET_LIST = new String[]{"`:", "`!"};

public static final String CDC_DATA = "data";
public static final String CDC_OP = "op";
}
@@ -134,6 +134,14 @@ public class LoadOptions implements Serializable {
description = "The number of lines in each submit")
public int batchSize = 500;

@Parameter(names = {"--cdc-flush-interval"}, arity = 1,
description = "The flush interval for flink cdc")
public int flushIntervalMs = 30000;

@Parameter(names = {"--cdc-sink-parallelism"}, arity = 1,
description = "The sink parallelism for flink cdc")
public int sinkParallelism = 1;

@Parameter(names = {"--shutdown-timeout"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The timeout of awaitTermination in seconds")
@@ -0,0 +1,88 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.loader.flink;

import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;

import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.util.Log;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

import io.debezium.data.Envelope;

public class HugeGraphDeserialization implements DebeziumDeserializationSchema<String> {

private static final Logger LOG = Log.logger(HugeGraphDeserialization.class);

@Override
public void deserialize(SourceRecord sourceRecord,
Collector<String> collector) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ObjectNode result = mapper.createObjectNode();

Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String op = operation.code();
Struct value = (Struct) sourceRecord.value();
Struct data;
switch (operation) {
case DELETE:
data = value.getStruct("before");
break;
case CREATE:
case READ:
case UPDATE:
data = value.getStruct("after");
break;
default:
throw new IllegalArgumentException(
"The type of `op` should be 'c' 'r' 'u' 'd' only");
}
ObjectNode rootNode = mapper.createObjectNode();
if (data != null) {
Schema afterSchema = data.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = data.get(field);
rootNode.put(field.name(), afterValue.toString());
}
}

result.set(Constants.CDC_DATA, rootNode);
result.put(Constants.CDC_OP, op);
LOG.debug("Loaded data: {}", result.toString());
collector.collect(result.toString());
}

@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}

0 comments on commit 87869a1

Please sign in to comment.