Skip to content

Commit dd64ed5

Browse files
authored
[Feature][Connector-V2][Assert] Support check the precision and scale of Decimal type. (#6110)
1 parent 4def0f9 commit dd64ed5

File tree

8 files changed

+137
-10
lines changed

8 files changed

+137
-10
lines changed

release-note.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@
185185
- [Transform-V2] Add Catalog support for FilterRowKindTransform (#4420)
186186
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
187187
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
188+
- [Connector-V2] [Assert] Support check the precision and scale of Decimal type (#6110)
188189

189190
### Zeta(ST-Engine)
190191

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.assertion.excecutor;
1919

20+
import org.apache.seatunnel.api.table.type.DecimalType;
2021
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2122
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
24+
import org.apache.seatunnel.api.table.type.SqlType;
2325
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorErrorCode;
2426
import org.apache.seatunnel.connectors.seatunnel.assertion.exception.AssertConnectorException;
2527
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
@@ -163,6 +165,21 @@ private boolean compareValue(Object value, AssertFieldRule.AssertRule valueRule)
163165
}
164166

165167
private Boolean checkType(Object value, SeaTunnelDataType<?> fieldType) {
168+
if (fieldType.getSqlType() == SqlType.DECIMAL) {
169+
return checkDecimalType(value, fieldType);
170+
}
166171
return value.getClass().equals(fieldType.getTypeClass());
167172
}
173+
174+
private static Boolean checkDecimalType(Object value, SeaTunnelDataType<?> fieldType) {
175+
if (!value.getClass().equals(fieldType.getTypeClass())) {
176+
return false;
177+
}
178+
DecimalType fieldDecimalType = (DecimalType) fieldType;
179+
BigDecimal valueObj = (BigDecimal) value;
180+
if (valueObj.scale() != fieldDecimalType.getScale()) {
181+
return false;
182+
}
183+
return valueObj.precision() <= fieldDecimalType.getPrecision();
184+
}
168185
}

seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/rule/AssertRuleParser.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.regex.Matcher;
32+
import java.util.regex.Pattern;
3133
import java.util.stream.Collectors;
3234

3335
import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.EQUALS_TO;
@@ -39,6 +41,9 @@
3941

4042
public class AssertRuleParser {
4143

44+
private static final Pattern DECIMAL_TYPE_PATTERN =
45+
Pattern.compile("^decimal\\(\\s*(\\d+)\\s*,\\s*(\\d+)\\s*\\)$");
46+
4247
public List<AssertFieldRule.AssertRule> parseRowRules(List<? extends Config> rowRuleList) {
4348

4449
return assembleFieldValueRules(rowRuleList);
@@ -91,13 +96,14 @@ private List<AssertFieldRule.AssertRule> assembleFieldValueRules(
9196
}
9297

9398
private SeaTunnelDataType<?> getFieldType(String fieldTypeStr) {
94-
if (fieldTypeStr.toLowerCase().startsWith("decimal(")) {
95-
String lengthAndScale =
96-
fieldTypeStr.toLowerCase().replace("decimal(", "").replace(")", "");
97-
String[] split = lengthAndScale.split(",");
98-
return new DecimalType(Integer.valueOf(split[0]), Integer.valueOf(split[1]));
99+
final String normalTypeStr = fieldTypeStr.trim().toLowerCase();
100+
Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(normalTypeStr);
101+
if (matcher.find()) {
102+
int precision = Integer.parseInt(matcher.group(1));
103+
int scale = Integer.parseInt(matcher.group(2));
104+
return new DecimalType(precision, scale);
99105
}
100-
return TYPES.get(fieldTypeStr.toLowerCase());
106+
return TYPES.get(normalTypeStr);
101107
}
102108

103109
private static final Map<String, SeaTunnelDataType<?>> TYPES = Maps.newHashMap();

seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/AssertExecutorTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.flink.assertion;
1919

2020
import org.apache.seatunnel.api.table.type.BasicType;
21+
import org.apache.seatunnel.api.table.type.DecimalType;
2122
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2223
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,8 +29,11 @@
2829

2930
import com.google.common.collect.Lists;
3031

32+
import java.math.BigDecimal;
33+
import java.util.Collections;
3134
import java.util.List;
3235

36+
import static org.junit.jupiter.api.Assertions.assertEquals;
3337
import static org.junit.jupiter.api.Assertions.assertNotNull;
3438
import static org.junit.jupiter.api.Assertions.assertNull;
3539

@@ -111,4 +115,51 @@ private AssertFieldRule getFieldRule4Name() {
111115
rule.setFieldRules(valueRules);
112116
return rule;
113117
}
118+
119+
@Test
120+
public void testDecimalTypeCheck() {
121+
List<AssertFieldRule> rules = Lists.newArrayList();
122+
AssertFieldRule rule = new AssertFieldRule();
123+
rule.setFieldName("c_mock");
124+
DecimalType assertFieldType = new DecimalType(10, 2);
125+
rule.setFieldType(assertFieldType);
126+
127+
AssertFieldRule.AssertRule valueRule = new AssertFieldRule.AssertRule();
128+
valueRule.setEqualTo("99999999.90");
129+
130+
rule.setFieldRules(Collections.singletonList(valueRule));
131+
rules.add(rule);
132+
133+
SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {new BigDecimal("99999999.90")});
134+
SeaTunnelRowType mockType =
135+
new SeaTunnelRowType(
136+
new String[] {"c_mock"}, new SeaTunnelDataType[] {new DecimalType(10, 2)});
137+
138+
AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, rules).orElse(null);
139+
assertNull(failRule);
140+
}
141+
142+
@Test
143+
public void testDecimalTypeCheckError() {
144+
List<AssertFieldRule> rules = Lists.newArrayList();
145+
AssertFieldRule rule = new AssertFieldRule();
146+
rule.setFieldName("c_mock");
147+
DecimalType assertFieldType = new DecimalType(1, 0);
148+
rule.setFieldType(assertFieldType);
149+
150+
AssertFieldRule.AssertRule valueRule = new AssertFieldRule.AssertRule();
151+
valueRule.setRuleType(AssertFieldRule.AssertRuleType.NOT_NULL);
152+
rule.setFieldRules(Collections.singletonList(valueRule));
153+
rules.add(rule);
154+
155+
SeaTunnelRow mockRow = new SeaTunnelRow(new Object[] {BigDecimal.valueOf(99999999.99)});
156+
SeaTunnelRowType mockType =
157+
new SeaTunnelRowType(
158+
new String[] {"c_mock"}, new SeaTunnelDataType[] {new DecimalType(10, 2)});
159+
160+
AssertFieldRule failRule = assertExecutor.fail(mockRow, mockType, rules).orElse(null);
161+
assertNotNull(failRule);
162+
assertEquals(assertFieldType, failRule.getFieldType());
163+
assertEquals("c_mock", failRule.getFieldName());
164+
}
114165
}

seatunnel-connectors-v2/connector-assert/src/test/java/org/apache/seatunnel/flink/assertion/rule/AssertRuleParserTest.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
2222

2323
import org.apache.seatunnel.api.table.type.BasicType;
24+
import org.apache.seatunnel.api.table.type.DecimalType;
2425
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertFieldRule;
2526
import org.apache.seatunnel.connectors.seatunnel.assertion.rule.AssertRuleParser;
2627

@@ -37,8 +38,39 @@ public class AssertRuleParserTest {
3738
public void testParseRules() {
3839
List<? extends Config> ruleConfigList = assembleConfig();
3940
List<AssertFieldRule> assertFieldRules = parser.parseRules(ruleConfigList);
40-
assertEquals(assertFieldRules.size(), 2);
41-
assertEquals(assertFieldRules.get(0).getFieldType(), BasicType.STRING_TYPE);
41+
assertEquals(3, assertFieldRules.size());
42+
43+
AssertFieldRule nameRule = assertFieldRules.get(0);
44+
List<AssertFieldRule.AssertRule> nameValueRules = nameRule.getFieldRules();
45+
assertEquals(BasicType.STRING_TYPE, nameRule.getFieldType());
46+
assertEquals("name", nameRule.getFieldName());
47+
assertEquals(3, nameValueRules.size());
48+
assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL, nameValueRules.get(0).getRuleType());
49+
assertEquals(
50+
AssertFieldRule.AssertRuleType.MIN_LENGTH, nameValueRules.get(1).getRuleType());
51+
assertEquals(3.0, nameValueRules.get(1).getRuleValue());
52+
assertEquals(
53+
AssertFieldRule.AssertRuleType.MAX_LENGTH, nameValueRules.get(2).getRuleType());
54+
assertEquals(5.0, nameValueRules.get(2).getRuleValue());
55+
56+
AssertFieldRule ageRule = assertFieldRules.get(1);
57+
List<AssertFieldRule.AssertRule> ageValueRules = ageRule.getFieldRules();
58+
assertEquals("age", ageRule.getFieldName());
59+
assertEquals(3, ageValueRules.size());
60+
assertEquals(AssertFieldRule.AssertRuleType.NOT_NULL, ageValueRules.get(0).getRuleType());
61+
assertEquals(AssertFieldRule.AssertRuleType.MIN, ageValueRules.get(1).getRuleType());
62+
assertEquals(10.0, ageValueRules.get(1).getRuleValue());
63+
assertEquals(AssertFieldRule.AssertRuleType.MAX, ageValueRules.get(2).getRuleType());
64+
assertEquals(20.0, ageValueRules.get(2).getRuleValue());
65+
66+
AssertFieldRule decimalRule = assertFieldRules.get(2);
67+
List<AssertFieldRule.AssertRule> decimalValueRules = decimalRule.getFieldRules();
68+
assertEquals("c_decimal", decimalRule.getFieldName());
69+
assertEquals(new DecimalType(10, 2), decimalRule.getFieldType());
70+
assertEquals(2, decimalValueRules.size());
71+
assertEquals(
72+
AssertFieldRule.AssertRuleType.NOT_NULL, decimalValueRules.get(0).getRuleType());
73+
assertEquals("12.12", decimalValueRules.get(1).getEqualTo());
4274
}
4375

4476
private List<? extends Config> assembleConfig() {
@@ -76,6 +108,17 @@ private List<? extends Config> assembleConfig() {
76108
+ " rule_value = 20\n"
77109
+ " }\n"
78110
+ " ]\n"
111+
+ " },{\n"
112+
+ " field_name = c_decimal\n"
113+
+ " field_type= \" decimal( 10 , 2 ) \"\n"
114+
+ " field_value = [\n"
115+
+ " {\n"
116+
+ " rule_type = NOT_NULL\n"
117+
+ " },\n"
118+
+ " {\n"
119+
+ " equals_to = \"12.12\"\n"
120+
+ " }\n"
121+
+ " ]\n"
79122
+ " }\n"
80123
+ " ]\n"
81124
+ " \n"

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ protected void insertTestData() {
131131
+ " TRUE,"
132132
+ " '2023-09-04',"
133133
+ " '2023-09-04 10:30:00',"
134-
+ " 42.12,"
134+
+ " 42.10,"
135135
+ " 42.12)");
136136
}
137137
} catch (Exception exception) {

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ sink{
126126
{
127127
field_name = hive_e2e_source_table.decimal_column
128128
field_type = "decimal(10,2)"
129-
field_value = [{equals_to = 42.12}]
129+
field_value = [{equals_to = "42.10"}]
130130
},
131131
{
132132
field_name = hive_e2e_source_table.numeric_column

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ sink {
105105
rule_type = NOT_NULL
106106
}
107107
]
108+
},
109+
{
110+
field_name = c_decimal
111+
field_type = "decimal(33, 18)"
112+
field_value = [
113+
{
114+
rule_type = NOT_NULL
115+
}
116+
]
108117
}
109118
]
110119
}

0 commit comments

Comments
 (0)