From c9fde0e38e88f4a56362364c1a802c6de62fd39d Mon Sep 17 00:00:00 2001 From: twalthr Date: Mon, 19 Sep 2016 16:49:36 +0200 Subject: [PATCH] Improve BigDecimal parser --- .../flink/types/parser/BigDecParser.java | 30 +++++++++++++++---- .../flink/types/parser/BigIntParser.java | 2 +- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java index d6c71fe4dc8199..8582b0d0524939 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java @@ -20,6 +20,7 @@ package org.apache.flink.types.parser; import java.math.BigDecimal; +import java.nio.ByteBuffer; import org.apache.flink.annotation.PublicEvolving; /** @@ -31,6 +32,7 @@ public class BigDecParser extends FieldParser { private static final BigDecimal BIG_DECIMAL_INSTANCE = BigDecimal.ZERO; private BigDecimal result; + private char[] reuse = null; @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, BigDecimal reusable) { @@ -41,7 +43,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, B while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } break; @@ -55,9 +57,20 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, B return -1; } - String str = new String(bytes, startPos, i - startPos); try { - this.result = new BigDecimal(str); + final int length = i - startPos; + if (reuse == null || reuse.length != length) { + reuse = new char[length]; + } + for (int j = 0; j < length; j++) { + final byte b = bytes[startPos + j]; + if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') { + throw new NumberFormatException(); + } + reuse[j] = (char) bytes[startPos + j]; + } + + this.result = new BigDecimal(reuse); return (i == limit) ? limit : i + delimiter.length; } catch (NumberFormatException e) { setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR); @@ -120,7 +133,14 @@ public static final BigDecimal parseField(byte[] bytes, int startPos, int length throw new NumberFormatException("There is leading or trailing whitespace in the numeric field."); } - String str = new String(bytes, startPos, i); - return new BigDecimal(str); + final char[] reuse = new char[i]; + for (int j = 0; j < i; j++) { + final byte b = bytes[startPos + j]; + if ((b < '0' || b > '9') && b != '-' && b != '+' && b != '.' && b != 'E' && b != 'e') { + throw new NumberFormatException(); + } + reuse[j] = (char) bytes[startPos + j]; + } + return new BigDecimal(reuse); } } diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java index 912305a428da23..13361c176b961f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java @@ -41,7 +41,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, B while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } break;