From 9a0739dcdfc3b865ea3e74f47bd1188a18abe60a Mon Sep 17 00:00:00 2001 From: James Xu Date: Wed, 17 May 2017 22:48:00 +0800 Subject: [PATCH 1/3] support encoding/decoding of TIME --- .../java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java | 4 ++++ .../org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java index 7b6428ed2194..d006edf15a44 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java @@ -176,6 +176,10 @@ public Date getDate(int idx) { return (Date) getFieldValue(idx); } + public GregorianCalendar getGregorianCalendar(int idx) { + return (GregorianCalendar) getFieldValue(idx); + } + public Object getFieldValue(String fieldName) { return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 0accb9a4f6a3..be029bfe5e3b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Date; +import java.util.GregorianCalendar; import java.util.List; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; @@ -87,6 +88,9 @@ public void encode(BeamSQLRow value, OutputStream outStream, case VARCHAR: stringCoder.encode(value.getString(idx), outStream, context.nested()); break; + case TIME: + longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), + outStream, context.nested()); case TIMESTAMP: longCoder.encode(value.getDate(idx).getTime(), outStream, context); break; @@ -136,6 +140,11 @@ public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder. case VARCHAR: record.addField(idx, stringCoder.decode(inStream, context.nested())); break; + case TIME: + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date(longCoder.decode(inStream, context.nested()))); + record.addField(idx, calendar); + break; case TIMESTAMP: record.addField(idx, new Date(longCoder.decode(inStream, context))); break; From 12331d5af6482f18ddfaec2b1e22e192a1605c52 Mon Sep 17 00:00:00 2001 From: James Xu Date: Wed, 17 May 2017 22:52:08 +0800 Subject: [PATCH 2/3] support new data type: DECIMAL --- .../beam/dsls/sql/schema/BeamSQLRow.java | 260 ++++++++++-------- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 18 +- .../dsls/sql/schema/BeamSqlRowCoderTest.java | 37 ++- 3 files changed, 180 insertions(+), 135 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java index d006edf15a44..5f7c2b4a9cbd 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java @@ -18,6 +18,7 @@ package org.apache.beam.dsls.sql.schema; import java.io.Serializable; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Date; import java.util.GregorianCalendar; @@ -84,62 +85,68 @@ public void addField(int index, Object fieldValue) { SqlTypeName fieldType = dataType.getFieldsType().get(index); switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case VARCHAR: - if (!(fieldValue instanceof String)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } - break; - default: - throw new UnsupportedDataTypeException(fieldType); + case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TINYINT: + if (!(fieldValue instanceof Byte)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case DECIMAL: + if (!(fieldValue instanceof BigDecimal)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case VARCHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIME: + if (!(fieldValue instanceof GregorianCalendar)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } + break; + default: + throw new UnsupportedDataTypeException(fieldType); } dataValues.set(index, fieldValue); } @@ -180,6 +187,10 @@ public GregorianCalendar getGregorianCalendar(int idx) { return (GregorianCalendar) getFieldValue(idx); } + public BigDecimal getBigDecimal(int idx) { + return (BigDecimal) getFieldValue(idx); + } + public Object getFieldValue(String fieldName) { return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); } @@ -193,71 +204,78 @@ public Object getFieldValue(int fieldIdx) { SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx); switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case VARCHAR: - if (!(fieldValue instanceof String)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIMESTAMP: - if (!(fieldValue instanceof Date)) { - throw new InvalidFieldException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - default: - throw new UnsupportedDataTypeException(fieldType); + case INTEGER: + if (!(fieldValue instanceof Integer)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case SMALLINT: + if (!(fieldValue instanceof Short)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TINYINT: + if (!(fieldValue instanceof Byte)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case DOUBLE: + if (!(fieldValue instanceof Double)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case DECIMAL: + if (!(fieldValue instanceof BigDecimal)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case BIGINT: + if (!(fieldValue instanceof Long)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case FLOAT: + if (!(fieldValue instanceof Float)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case VARCHAR: + if (!(fieldValue instanceof String)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TIME: + if (!(fieldValue instanceof GregorianCalendar)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + case TIMESTAMP: + if (!(fieldValue instanceof Date)) { + throw new InvalidFieldException( + String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + } else { + return fieldValue; + } + default: + throw new UnsupportedDataTypeException(fieldType); } } diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index be029bfe5e3b..5d76be1d18aa 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -23,6 +23,8 @@ import java.util.Date; import java.util.GregorianCalendar; import java.util.List; + +import org.apache.beam.sdk.coders.BigDecimalCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -47,6 +49,7 @@ public class BeamSqlRowCoder extends StandardCoder{ private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); private static final DoubleCoder doubleCoder = DoubleCoder.of(); private static final InstantCoder instantCoder = InstantCoder.of(); + private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder(); private BeamSqlRowCoder(){} @@ -82,6 +85,9 @@ public void encode(BeamSQLRow value, OutputStream outStream, case FLOAT: doubleCoder.encode((double) value.getFloat(idx), outStream, context.nested()); break; + case DECIMAL: + bigDecimalCoder.encode(value.getBigDecimal(idx), outStream, context.nested()); + break; case BIGINT: longCoder.encode(value.getLong(idx), outStream, context.nested()); break; @@ -91,8 +97,9 @@ public void encode(BeamSQLRow value, OutputStream outStream, case TIME: longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream, context.nested()); + break; case TIMESTAMP: - longCoder.encode(value.getDate(idx).getTime(), outStream, context); + longCoder.encode(value.getDate(idx).getTime(), outStream, context.nested()); break; default: @@ -100,7 +107,7 @@ public void encode(BeamSQLRow value, OutputStream outStream, } } - instantCoder.encode(value.getWindowStart(), outStream, context.nested()); + instantCoder.encode(value.getWindowStart(), outStream, context); instantCoder.encode(value.getWindowEnd(), outStream, context); } @@ -137,6 +144,9 @@ public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder. case BIGINT: record.addField(idx, longCoder.decode(inStream, context.nested())); break; + case DECIMAL: + record.addField(idx, bigDecimalCoder.decode(inStream, context.nested())); + break; case VARCHAR: record.addField(idx, stringCoder.decode(inStream, context.nested())); break; @@ -146,7 +156,7 @@ public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder. record.addField(idx, calendar); break; case TIMESTAMP: - record.addField(idx, new Date(longCoder.decode(inStream, context))); + record.addField(idx, new Date(longCoder.decode(inStream, context.nested()))); break; default: @@ -154,7 +164,7 @@ public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder. } } - record.setWindowStart(instantCoder.decode(inStream, context.nested())); + record.setWindowStart(instantCoder.decode(inStream, context)); record.setWindowEnd(instantCoder.decode(inStream, context)); return record; diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index f2077947ec3a..bc6343b92b70 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -18,6 +18,10 @@ package org.apache.beam.dsls.sql.schema; +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; + import org.apache.beam.sdk.testing.CoderProperties; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; @@ -38,11 +42,16 @@ public void encodeAndDecode() throws Exception { @Override public RelDataType apply(RelDataTypeFactory a0) { return a0.builder() - .add("id", SqlTypeName.INTEGER) - .add("order_id", SqlTypeName.BIGINT) - .add("price", SqlTypeName.FLOAT) - .add("amount", SqlTypeName.DOUBLE) - .add("user_name", SqlTypeName.VARCHAR) + .add("col_tinyint", SqlTypeName.TINYINT) + .add("col_smallint", SqlTypeName.SMALLINT) + .add("col_integer", SqlTypeName.INTEGER) + .add("col_bigint", SqlTypeName.BIGINT) + .add("col_float", SqlTypeName.FLOAT) + .add("col_double", SqlTypeName.DOUBLE) + .add("col_decimal", SqlTypeName.DECIMAL) + .add("col_string_varchar", SqlTypeName.VARCHAR) + .add("col_time", SqlTypeName.TIME) + .add("col_timestamp", SqlTypeName.TIMESTAMP) .build(); } }; @@ -51,11 +60,19 @@ public RelDataType apply(RelDataTypeFactory a0) { protoRowType.apply(new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT))); BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); - row.addField(0, 1); - row.addField(1, 1L); - row.addField(2, 1.1F); - row.addField(3, 1.1); - row.addField(4, "hello"); + row.addField("col_tinyint", Byte.valueOf("1")); + row.addField("col_smallint", Short.valueOf("1")); + row.addField("col_integer", 1); + row.addField("col_bigint", 1L); + row.addField("col_float", 1.1F); + row.addField("col_double", 1.1); + row.addField("col_decimal", BigDecimal.ZERO); + row.addField("col_string_varchar", "hello"); + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date()); + row.addField("col_time", calendar); + row.addField("col_timestamp", new Date()); + BeamSqlRowCoder coder = BeamSqlRowCoder.of(); CoderProperties.coderDecodeEncodeEqual(coder, row); From 9ff1a6a34366425879c504a158b9e0199912ca5e Mon Sep 17 00:00:00 2001 From: James Xu Date: Fri, 19 May 2017 14:27:54 +0800 Subject: [PATCH 3/3] minor --- .../java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index 5d76be1d18aa..c72ba62e8173 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -107,7 +107,7 @@ public void encode(BeamSQLRow value, OutputStream outStream, } } - instantCoder.encode(value.getWindowStart(), outStream, context); + instantCoder.encode(value.getWindowStart(), outStream, context.nested()); instantCoder.encode(value.getWindowEnd(), outStream, context); } @@ -164,7 +164,7 @@ public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder. } } - record.setWindowStart(instantCoder.decode(inStream, context)); + record.setWindowStart(instantCoder.decode(inStream, context.nested())); record.setWindowEnd(instantCoder.decode(inStream, context)); return record;