Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,36 @@ private void parseNodeRelation(NodeRelation relation, Map<String, Node> nodeMap,
"relation must have at least one output node");
relation.getOutputs().forEach(s -> {
Preconditions.checkNotNull(s, "node id in outputs is null");
Node node = nodeMap.get(s);
Preconditions.checkNotNull(node, "can not find any node by node id " + s);
parseNode(node, relation, nodeMap, relationMap);
Node outputNode = nodeMap.get(s);
Preconditions.checkNotNull(outputNode, "can not find any node by node id " + s);
parseInputNodes(relation, nodeMap, relationMap);
parseSingleNode(outputNode, relation, nodeMap);
// for Load node we need to generate insert sql
if (outputNode instanceof LoadNode) {
insertSqls.add(genLoadNodeInsertSql((LoadNode) outputNode, relation, nodeMap));
}
});
log.info("parse node relation success, relation:{}", relation);
}

/**
* parse the input nodes corresponding to the output node
* @param relation Define relations between nodes, it also shows the data flow
* @param nodeMap Store the mapping relation between node id and node
* @param relationMap Store the mapping relation between node id and relation
*/
private void parseInputNodes(NodeRelation relation, Map<String, Node> nodeMap,
Map<String, NodeRelation> relationMap) {
for (String upstreamNodeId : relation.getInputs()) {
if (!hasParsedSet.contains(upstreamNodeId)) {
Node upstreamNode = nodeMap.get(upstreamNodeId);
Preconditions.checkNotNull(upstreamNode,
"can not find any node by node id " + upstreamNodeId);
parseSingleNode(upstreamNode, relationMap.get(upstreamNodeId), nodeMap);
}
}
}

private void registerTableSql(Node node, String sql) {
if (node instanceof ExtractNode) {
extractTableSqls.add(sql);
Expand All @@ -246,15 +269,13 @@ private void registerTableSql(Node node, String sql) {
}

/**
* Parse a node and recursively resolve its dependent nodes
* Parse a single node and generate the corresponding sql
*
* @param node The abstract of extract, transform, load
* @param relation Define relations between nodes, it also shows the data flow
* @param nodeMap store the mapping relation between node id and node
* @param relationMap Store the mapping relation between node id and relation
*/
private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeMap,
Map<String, NodeRelation> relationMap) {
private void parseSingleNode(Node node, NodeRelation relation, Map<String, Node> nodeMap) {
if (hasParsedSet.contains(node.getId())) {
log.warn("the node has already been parsed, node id:{}", node.getId());
return;
Expand All @@ -267,22 +288,10 @@ private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeM
hasParsedSet.add(node.getId());
} else {
Preconditions.checkNotNull(relation, "relation is null");
for (String upstreamNodeId : relation.getInputs()) {
if (!hasParsedSet.contains(upstreamNodeId)) {
Node upstreamNode = nodeMap.get(upstreamNodeId);
Preconditions.checkNotNull(upstreamNode,
"can not find any node by node id " + upstreamNodeId);
parseNode(upstreamNode, relationMap.get(upstreamNodeId), nodeMap, relationMap);
}
}
if (node instanceof LoadNode) {
String createSql = genCreateSql(node);
log.info("node id:{}, create table sql:\n{}", node.getId(), createSql);
registerTableSql(node, createSql);
LoadNode loadNode = (LoadNode) node;
String insertSql = genLoadNodeInsertSql(loadNode, relation, nodeMap);
log.info("node id:{}, insert sql:\n{}", node.getId(), insertSql);
insertSqls.add(insertSql);
hasParsedSet.add(node.getId());
} else if (node instanceof TransformNode) {
TransformNode transformNode = (TransformNode) node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ private MySqlExtractNode buildAllMigrateExtractNode() {
ExtractMode.CDC, null, null);
}

private MySqlExtractNode buildAllMigrateExtractNode2() {

Map<String, String> option = new HashMap<>();
option.put("append-mode", "false");
option.put("migrate-all", "true");
List<String> tables = new ArrayList(10);
tables.add("test.*");
List<FieldInfo> fields = Collections.singletonList(
new MetaFieldInfo("data", MetaField.DATA));

return new MySqlExtractNode("2", "mysql_input", fields,
null, option, null,
tables, "localhost", "root", "inlong",
"test", null, null, true, null,
ExtractMode.CDC, null, null);
}

private MySqlExtractNode buildAllMigrateExtractNodeWithBytesFormat() {
List<FieldInfo> fields = Collections.singletonList(
new MetaFieldInfo("data", MetaField.DATA_BYTES_DEBEZIUM));
Expand Down Expand Up @@ -101,7 +118,7 @@ private KafkaLoadNode buildAllMigrateKafkaNode() {
new FieldInfo("data", new StringFormatInfo())));
CsvFormat csvFormat = new CsvFormat();
csvFormat.setDisableQuoteCharacter(true);
return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
return new KafkaLoadNode("3", "kafka_output", fields, relations, null, null,
"topic", "localhost:9092",
csvFormat, null,
null, null);
Expand Down Expand Up @@ -140,6 +157,36 @@ public void testAllMigrate() throws Exception {
Assert.assertTrue(result.tryExecute());
}

/**
* Test all migrate with two input nodes and one output node (two relations)
*
* @throws Exception The exception may throws when execute the case
*/
@Test
public void testAllMigrateMultiRelations() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Node inputNode = buildAllMigrateExtractNode();
Node inputNode2 = buildAllMigrateExtractNode2();
Node outputNode = buildAllMigrateKafkaNode();
StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, inputNode2, outputNode),
Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
Collections.singletonList(outputNode)),
buildNodeRelation(Collections.singletonList(inputNode2),
Collections.singletonList(outputNode))));
GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
ParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}

/**
* Test all migrate, the full database data is represented as bytes of canal json
*
Expand Down