Skip to content

Commit

Permalink
[Feature][Connector] add IT for Assert Sink in e2e module (#2036)
Browse files Browse the repository at this point in the history
* [Feature][Connector] add IT for Assert Sink in e2e module

* [Feature][Connector] add IT for Assert Sink in e2e module
add licence

* Update fakesource_to_assert.conf

amend assert config to fit FakeSource config

* [Feature][Connector] amend method name

* [connector][assert] there must be a sink in flink progress. so I add print()

* [connector][assert] use collect instead print in DataSet

* [connector][assert] Fixed AssertSink submit job twice
  • Loading branch information
lhyundeadsoul committed Jul 1, 2022
1 parent c74a167 commit cd1241d
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,6 @@ public Config getConfig() {
}

private boolean whetherExecute(List<FlinkBatchSink> sinks) {
return sinks.stream().anyMatch(s -> !"ConsoleSink".equals(s.getPluginName()));
return sinks.stream().noneMatch(s -> "ConsoleSink".equals(s.getPluginName()) || "AssertSink".equals(s.getPluginName()));
}
}
1 change: 1 addition & 0 deletions seatunnel-connectors/plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ flink.sink.FileSink = seatunnel-connector-flink-file
flink.sink.InfluxDbSink = seatunnel-connector-flink-influxdb
flink.sink.JdbcSink = seatunnel-connector-flink-jdbc
flink.sink.Kafka = seatunnel-connector-flink-kafka
flink.sink.AssertSink = seatunnel-connector-flink-assert

# Spark Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.google.auto.service.AutoService;
import lombok.SneakyThrows;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand All @@ -37,11 +38,11 @@
import java.util.List;

/**
* A flink sink plugin which can assert illegal data by user defined rules
* Refer to https://github.com/apache/incubator-seatunnel/issues/1912
* A flink sink plugin which can assert illegal data by user defined rules Refer to https://github.com/apache/incubator-seatunnel/issues/1912
*/
@AutoService(BaseFlinkSink.class)
public class AssertSink implements FlinkBatchSink, FlinkStreamSink {

//The assertion executor
private static final AssertExecutor ASSERT_EXECUTOR = new AssertExecutor();
//User defined rules used to assert illegal data
Expand All @@ -50,16 +51,19 @@ public class AssertSink implements FlinkBatchSink, FlinkStreamSink {
private Config config;
private List<? extends Config> configList;

@SneakyThrows
@Override
public void outputBatch(FlinkEnvironment env, DataSet<Row> inDataSet) {
inDataSet.map(row -> {
ASSERT_EXECUTOR
.fail(row, assertFieldRules)
.ifPresent(failRule -> {
throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
});
return null;
});
try {
inDataSet.collect().forEach(row ->
ASSERT_EXECUTOR
.fail(row, assertFieldRules)
.ifPresent(failRule -> {
throw new IllegalStateException("row :" + row + " fail rule: " + failRule);
}));
} catch (Exception ex) {
throw new RuntimeException("AssertSink execute failed", ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.flink.assertion;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import org.junit.Assert;
import org.junit.Test;
import org.testcontainers.containers.Container;

import java.io.IOException;

public class FakeSourceToAssertIT extends FlinkContainer {

@Test
public void testFakeSourceToAssertSink() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/assertion/fakesource_to_assert.conf");
Assert.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set flink configuration here
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
field_name = "name,age"
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
}

transform {
sql {
sql = "select name,age from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}

sink {
AssertSink {
rules =
[{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
},
{
rule_type = MIN_LENGTH
rule_value = 3
},
{
rule_type = MAX_LENGTH
rule_value = 20
}
]
},{
field_name = age
field_type = int
field_value = [
{
rule_type = NOT_NULL
},
{
rule_type = MIN
rule_value = 1
},
{
rule_type = MAX
rule_value = 100
}
]
}
]
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
}

0 comments on commit cd1241d

Please sign in to comment.