diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java new file mode 100644 index 0000000000000..46a07fa041451 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import java.math.BigDecimal; +import org.apache.flink.annotation.PublicEvolving; + +/** + * Parses a text field into a {@link java.math.BigDecimal}. + */ +@PublicEvolving +public class BigDecParser extends FieldParser { + + private static final BigDecimal BIG_DECIMAL_INSTANCE = BigDecimal.ZERO; + + private BigDecimal result; + private char[] reuse = null; + + @Override + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigDecimal reusable) { + int i = startPos; + + final int delimLimit = limit - delimiter.length + 1; + + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } + break; + } + i++; + } + + if (i > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) { + setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); + return -1; + } + + try { + final int length = i - startPos; + if (reuse == null || reuse.length < length) { + reuse = new char[length]; + } + for (int j = 0; j < length; j++) { + final byte b = bytes[startPos + j]; + if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') { + throw new NumberFormatException(); + } + reuse[j] = (char) bytes[startPos + j]; + } + + this.result = new BigDecimal(reuse, 0, length); + return (i == limit) ? limit : i + delimiter.length; + } catch (NumberFormatException e) { + setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); + return -1; + } + } + + @Override + public BigDecimal createValue() { + return BIG_DECIMAL_INSTANCE; + } + + @Override + public BigDecimal getLastResult() { + return this.result; + } + + /** + * Static utility to parse a field of type BigDecimal from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @return The parsed value. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final BigDecimal parseField(byte[] bytes, int startPos, int length) { + return parseField(bytes, startPos, length, (char) 0xffff); + } + + /** + * Static utility to parse a field of type BigDecimal from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @param delimiter The delimiter that terminates the field. + * @return The parsed value. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final BigDecimal parseField(byte[] bytes, int startPos, int length, char delimiter) { + if (length <= 0) { + throw new NumberFormatException("Invalid input: Empty string"); + } + int i = 0; + final byte delByte = (byte) delimiter; + + while (i < length && bytes[startPos + i] != delByte) { + i++; + } + + if (i > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) { + throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); + } + + final char[] chars = new char[i]; + for (int j = 0; j < i; j++) { + final byte b = bytes[startPos + j]; + if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') { + throw new NumberFormatException(); + } + chars[j] = (char) bytes[startPos + j]; + } + return new BigDecimal(chars); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java new file mode 100644 index 0000000000000..13361c176b961 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import java.math.BigInteger; +import org.apache.flink.annotation.PublicEvolving; + +/** + * Parses a text field into a {@link java.math.BigInteger}. + */ +@PublicEvolving +public class BigIntParser extends FieldParser { + + private static final BigInteger BIG_INTEGER_INSTANCE = BigInteger.ZERO; + + private BigInteger result; + + @Override + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigInteger reusable) { + int i = startPos; + + final int delimLimit = limit - delimiter.length + 1; + + while (i < limit) { + if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } + break; + } + i++; + } + + if (i > startPos && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[(i - 1)]))) { + setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER); + return -1; + } + + String str = new String(bytes, startPos, i - startPos); + try { + this.result = new BigInteger(str); + return (i == limit) ? limit : i + delimiter.length; + } catch (NumberFormatException e) { + setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); + return -1; + } + } + + @Override + public BigInteger createValue() { + return BIG_INTEGER_INSTANCE; + } + + @Override + public BigInteger getLastResult() { + return this.result; + } + + /** + * Static utility to parse a field of type BigInteger from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @return The parsed value. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final BigInteger parseField(byte[] bytes, int startPos, int length) { + return parseField(bytes, startPos, length, (char) 0xffff); + } + + /** + * Static utility to parse a field of type BigInteger from a byte sequence that represents text + * characters + * (such as when read from a file stream). + * + * @param bytes The bytes containing the text data that should be parsed. + * @param startPos The offset to start the parsing. + * @param length The length of the byte sequence (counting from the offset). + * @param delimiter The delimiter that terminates the field. + * @return The parsed value. + * @throws NumberFormatException Thrown when the value cannot be parsed because the text + * represents not a correct number. + */ + public static final BigInteger parseField(byte[] bytes, int startPos, int length, char delimiter) { + if (length <= 0) { + throw new NumberFormatException("Invalid input: Empty string"); + } + int i = 0; + final byte delByte = (byte) delimiter; + + while (i < length && bytes[startPos + i] != delByte) { + i++; + } + + if (i > 0 && + (Character.isWhitespace(bytes[startPos]) || Character.isWhitespace(bytes[startPos + i - 1]))) { + throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); + } + + String str = new String(bytes, startPos, i); + return new BigInteger(str); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 5f17840a6bc17..a1b9c5f4fda66 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -19,6 +19,8 @@ package org.apache.flink.types.parser; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.HashMap; import java.util.Map; @@ -208,6 +210,8 @@ public static Class> getParserForType(Class type) { PARSERS.put(Float.class, FloatParser.class); PARSERS.put(Double.class, DoubleParser.class); PARSERS.put(Boolean.class, BooleanParser.class); + PARSERS.put(BigDecimal.class, BigDecParser.class); + PARSERS.put(BigInteger.class, BigIntParser.class); // value types PARSERS.put(ByteValue.class, ByteValueParser.class); diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java new file mode 100644 index 0000000000000..9e17565e7de4b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +import java.math.BigDecimal; + +public class BigDecParserTest extends ParserTestBase { + + @Override + public String[] getValidTestValues() { + return new String[] { + "-12.5E1000", "-12.5E100", "-10000", "-1.1", "-1", "-0.44", + "0", "0000000", "0e0", "0.000000000000000000000000001", "0.0000001", + "0.1234123413478523984729447", "1", "10000", "10E100000", "10E1000000000" + }; + } + + @Override + public BigDecimal[] getValidTestResults() { + return new BigDecimal[] { + new BigDecimal("-12.5E1000"), + new BigDecimal("-12.5E100"), + new BigDecimal("-10000"), + new BigDecimal("-1.1"), + new BigDecimal("-1"), + new BigDecimal("-0.44"), + new BigDecimal("0"), + new BigDecimal("0"), + new BigDecimal("0e0"), + new BigDecimal("0.000000000000000000000000001"), + new BigDecimal("0.0000001"), + new BigDecimal("0.1234123413478523984729447"), + new BigDecimal("1"), + new BigDecimal("10000"), + new BigDecimal("10E100000"), + new BigDecimal("10E1000000000"), + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { + "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t" + }; + } + + @Override + public boolean allowsEmptyField() { + return false; + } + + @Override + public FieldParser getParser() { + return new BigDecParser(); + } + + @Override + public Class getTypeClass() { + return BigDecimal.class; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java new file mode 100644 index 0000000000000..473cb55a81582 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +import java.math.BigInteger; + +public class BigIntParserTest extends ParserTestBase { + + @Override + public String[] getValidTestValues() { + return new String[] { + "-8745979691234123413478523984729447", "-10000", "-1", "0", + "0000000", "8745979691234123413478523984729447" + }; + } + + @Override + public BigInteger[] getValidTestResults() { + return new BigInteger[] { + new BigInteger("-8745979691234123413478523984729447"), + new BigInteger("-10000"), + new BigInteger("-1"), + new BigInteger("0"), + new BigInteger("0"), + new BigInteger("8745979691234123413478523984729447") + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { + "1.1" ,"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", "\t" + }; + } + + @Override + public boolean allowsEmptyField() { + return false; + } + + @Override + public FieldParser getParser() { + return new BigIntParser(); + } + + @Override + public Class getTypeClass() { + return BigInteger.class; + } +}