From 4a6bf4ceaf3fcccedec1bebd40fa3a97f135f119 Mon Sep 17 00:00:00 2001 From: mingmxu Date: Mon, 7 Aug 2017 16:12:21 -0700 Subject: [PATCH 1/2] [rebased] make BeamRecord an immutable type --- .../beam/sdk/coders/BeamRecordCoder.java | 10 +- .../apache/beam/sdk/values/BeamRecord.java | 22 ++-- .../sql/example/BeamSqlExample.java | 5 +- .../extensions/sql/impl/rel/BeamJoinRel.java | 7 +- .../sql/impl/rel/BeamValuesRel.java | 7 +- .../transform/BeamAggregationTransforms.java | 26 ++--- .../impl/transform/BeamJoinTransforms.java | 20 +--- .../sql/impl/transform/BeamSqlProjectFn.java | 9 +- .../sql/schema/BeamSqlRecordType.java | 3 + .../extensions/sql/schema/BeamTableUtils.java | 41 +++---- .../sql/BeamSqlDslAggregationTest.java | 107 ++++-------------- .../sdk/extensions/sql/BeamSqlDslBase.java | 56 ++------- .../extensions/sql/BeamSqlDslProjectTest.java | 48 ++++---- .../extensions/sql/BeamSqlDslUdfUdafTest.java | 8 +- .../beam/sdk/extensions/sql/TestUtils.java | 6 +- .../BeamSqlFnExecutorTestBase.java | 8 +- .../sql/schema/BeamSqlRowCoderTest.java | 16 +-- .../schema/kafka/BeamKafkaCSVTableTest.java | 12 +- 18 files changed, 130 insertions(+), 281 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java index a6200f6228969..6e1b8b6ffca28 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.BitSet; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; @@ -69,14 +70,15 @@ public void encode(BeamRecord value, OutputStream outStream) public BeamRecord decode(InputStream inStream) throws CoderException, IOException { BitSet nullFields = nullListCoder.decode(inStream); - BeamRecord record = new BeamRecord(recordType); + List fieldValues = new ArrayList<>(); for (int idx = 0; idx < recordType.size(); ++idx) { if (nullFields.get(idx)) { - continue; + fieldValues.add(null); + } else { + fieldValues.add(coderArray.get(idx).decode(inStream)); } - - record.addField(idx, coderArray.get(idx).decode(inStream)); } + BeamRecord record = new BeamRecord(recordType, fieldValues); return record; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index 35a96f6a33816..d4d94a08b39fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.GregorianCalendar; import java.util.List; @@ -32,29 +33,28 @@ */ @Experimental public class BeamRecord implements Serializable { + //immutable list of field values. private List dataValues; private BeamRecordType dataType; - public BeamRecord(BeamRecordType dataType) { + public BeamRecord(BeamRecordType dataType, List rawdataValues) { this.dataType = dataType; this.dataValues = new ArrayList<>(); + for (int idx = 0; idx < dataType.size(); ++idx) { dataValues.add(null); } - } - public BeamRecord(BeamRecordType dataType, List dataValues) { - this(dataType); - for (int idx = 0; idx < dataValues.size(); ++idx) { - addField(idx, dataValues.get(idx)); + for (int idx = 0; idx < dataType.size(); ++idx) { + addField(idx, rawdataValues.get(idx)); } } - public void addField(String fieldName, Object fieldValue) { - addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); + public BeamRecord(BeamRecordType dataType, Object... rawdataValues) { + this(dataType, Arrays.asList(rawdataValues)); } - public void addField(int index, Object fieldValue) { + private void addField(int index, Object fieldValue) { dataType.validateValueType(index, fieldValue); dataValues.set(index, fieldValue); } @@ -163,10 +163,6 @@ public List getDataValues() { return dataValues; } - public void setDataValues(List dataValues) { - this.dataValues = dataValues; - } - public BeamRecordType getDataType() { return dataType; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index fbc1fd8c9a6bd..acb5943e9d9e5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -54,10 +54,7 @@ public static void main(String[] args) throws Exception { List fieldNames = Arrays.asList("c1", "c2", "c3"); List fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); BeamSqlRecordType type = BeamSqlRecordType.create(fieldNames, fieldTypes); - BeamRecord row = new BeamRecord(type); - row.addField(0, 1); - row.addField(1, "row"); - row.addField(2, 1.0); + BeamRecord row = new BeamRecord(type, 1, "row", 1.0); //create a source PCollection with Create.of(); PCollection inputTable = PBegin.in(p).apply(Create.of(row) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 9e5ce2f66baca..2bd15b3db1bd9 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql.impl.rel; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -255,11 +256,7 @@ private PCollection sideInputJoinHelper( private BeamRecord buildNullRow(BeamRelNode relNode) { BeamSqlRecordType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); - BeamRecord nullRow = new BeamRecord(leftType); - for (int i = 0; i < leftType.size(); i++) { - nullRow.addField(i, null); - } - return nullRow; + return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null)); } private List> extractJoinColumns(int leftRowColumnCount) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index 8ad6e8dae20fb..57711b3abd407 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -65,11 +65,12 @@ public BeamValuesRel( BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); for (ImmutableList tuple : tuples) { - BeamRecord row = new BeamRecord(beamSQLRowType); + List fieldsValue = new ArrayList<>(); for (int i = 0; i < tuple.size(); i++) { - BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue()); + fieldsValue.add(BeamTableUtils.autoCastField( + beamSQLRowType.getFieldsType().get(i), tuple.get(i).getValue())); } - rows.add(row); + rows.add(new BeamRecord(beamSQLRowType, fieldsValue)); } return inputPCollections.getPipeline().apply(stageName, Create.of(rows)) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java index ce5444ffd6856..989720c2caa79 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java @@ -75,19 +75,15 @@ public MergeAggregationRecord(BeamSqlRecordType outRowType, List @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { - BeamRecord outRecord = new BeamRecord(outRowType); - + List fieldValues = new ArrayList<>(); KV kvRecord = c.element(); - for (String f : kvRecord.getKey().getDataType().getFieldsName()) { - outRecord.addField(f, kvRecord.getKey().getFieldValue(f)); - } - for (int idx = 0; idx < aggFieldNames.size(); ++idx) { - outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx)); - } + fieldValues.addAll(kvRecord.getKey().getDataValues()); + fieldValues.addAll(kvRecord.getValue().getDataValues()); if (windowStartFieldIdx != -1) { - outRecord.addField(windowStartFieldIdx, ((IntervalWindow) window).start().toDate()); + fieldValues.add(windowStartFieldIdx, ((IntervalWindow) window).start().toDate()); } + BeamRecord outRecord = new BeamRecord(outRowType, fieldValues); c.output(outRecord); } } @@ -111,11 +107,13 @@ public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet groupSet) { @Override public BeamRecord apply(BeamRecord input) { BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(BeamSqlRecordHelper.getSqlRecordType(input)); - BeamRecord keyOfRecord = new BeamRecord(typeOfKey); + List fieldValues = new ArrayList<>(); for (int idx = 0; idx < groupByKeys.size(); ++idx) { - keyOfRecord.addField(idx, input.getFieldValue(groupByKeys.get(idx))); + fieldValues.add(input.getFieldValue(groupByKeys.get(idx))); } + + BeamRecord keyOfRecord = new BeamRecord(typeOfKey, fieldValues); return keyOfRecord; } @@ -241,11 +239,11 @@ public AggregationAccumulator mergeAccumulators(Iterable } @Override public BeamRecord extractOutput(AggregationAccumulator accumulator) { - BeamRecord result = new BeamRecord(finalRowType); + List fieldValues = new ArrayList<>(); for (int idx = 0; idx < aggregators.size(); ++idx) { - result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx))); + fieldValues.add(aggregators.get(idx).result(accumulator.accumulatorElements.get(idx))); } - return result; + return new BeamRecord(finalRowType, fieldValues); } @Override public Coder getAccumulatorCoder( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index 105bbf3d3d18c..f4e846f09731f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -66,12 +66,12 @@ public ExtractJoinFields(boolean isLeft, List> joinColumn BeamSqlRecordType type = BeamSqlRecordType.create(names, types); // build the row - BeamRecord row = new BeamRecord(type); + List fieldValues = new ArrayList<>(); for (int i = 0; i < joinColumns.size(); i++) { - row.addField(i, input + fieldValues.add(input .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue())); } - return KV.of(row, input); + return KV.of(new BeamRecord(type, fieldValues), input); } } @@ -154,16 +154,8 @@ private static BeamRecord combineTwoRowsIntoOneHelper(BeamRecord leftRow, types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldsType()); BeamSqlRecordType type = BeamSqlRecordType.create(names, types); - BeamRecord row = new BeamRecord(type); - // build the row - for (int i = 0; i < leftRow.size(); i++) { - row.addField(i, leftRow.getFieldValue(i)); - } - - for (int i = 0; i < rightRow.size(); i++) { - row.addField(i + leftRow.size(), rightRow.getFieldValue(i)); - } - - return row; + List fieldValues = new ArrayList<>(leftRow.getDataValues()); + fieldValues.addAll(rightRow.getDataValues()); + return new BeamRecord(type, fieldValues); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java index 45dc621af8823..882ce64588a34 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSqlProjectFn.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl.transform; +import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; @@ -53,12 +54,12 @@ public void setup() { public void processElement(ProcessContext c, BoundedWindow window) { BeamRecord inputRow = c.element(); List results = executor.execute(inputRow, window); - - BeamRecord outRow = new BeamRecord(outputRowType); - + List fieldsValue = new ArrayList<>(); for (int idx = 0; idx < results.size(); ++idx) { - BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx)); + fieldsValue.add( + BeamTableUtils.autoCastField(outputRowType.getFieldsType().get(idx), results.get(idx))); } + BeamRecord outRow = new BeamRecord(outputRowType, fieldsValue); c.output(outRow); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java index fe82834e8477d..5c12b105db922 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRecordType.java @@ -84,6 +84,9 @@ private BeamSqlRecordType(List fieldsName, List fieldsType public static BeamSqlRecordType create(List fieldNames, List fieldTypes) { + if (fieldNames.size() != fieldTypes.size()) { + throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match."); + } List fieldCoders = new ArrayList<>(); for (int idx = 0; idx < fieldTypes.size(); ++idx) { switch (fieldTypes.get(idx)) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java index 63c9720a49d62..4029e237dec0e 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java @@ -22,6 +22,8 @@ import java.io.StringReader; import java.io.StringWriter; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.values.BeamRecord; import org.apache.calcite.sql.type.SqlTypeName; @@ -39,7 +41,7 @@ public static BeamRecord csvLine2BeamSqlRow( CSVFormat csvFormat, String line, BeamSqlRecordType beamSqlRowType) { - BeamRecord row = new BeamRecord(beamSqlRowType); + List fieldsValue = new ArrayList<>(); try (StringReader reader = new StringReader(line)) { CSVParser parser = csvFormat.parse(reader); CSVRecord rawRecord = parser.getRecords().get(0); @@ -52,13 +54,13 @@ public static BeamRecord csvLine2BeamSqlRow( } else { for (int idx = 0; idx < beamSqlRowType.size(); idx++) { String raw = rawRecord.get(idx); - addFieldWithAutoTypeCasting(row, idx, raw); + fieldsValue.add(autoCastField(beamSqlRowType.getFieldsType().get(idx), raw)); } } } catch (IOException e) { throw new IllegalArgumentException("decodeRecord failed!", e); } - return row; + return new BeamRecord(beamSqlRowType, fieldsValue); } public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) { @@ -74,37 +76,29 @@ public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) { return writer.toString(); } - public static void addFieldWithAutoTypeCasting(BeamRecord row, int idx, Object rawObj) { + public static Object autoCastField(int fieldType, Object rawObj) { if (rawObj == null) { - row.addField(idx, null); - return; + return null; } - SqlTypeName columnType = CalciteUtils.getFieldType(BeamSqlRecordHelper.getSqlRecordType(row) - , idx); + SqlTypeName columnType = CalciteUtils.toCalciteType(fieldType); // auto-casting for numberics if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { String raw = rawObj.toString(); switch (columnType) { case TINYINT: - row.addField(idx, Byte.valueOf(raw)); - break; + return Byte.valueOf(raw); case SMALLINT: - row.addField(idx, Short.valueOf(raw)); - break; + return Short.valueOf(raw); case INTEGER: - row.addField(idx, Integer.valueOf(raw)); - break; + return Integer.valueOf(raw); case BIGINT: - row.addField(idx, Long.valueOf(raw)); - break; + return Long.valueOf(raw); case FLOAT: - row.addField(idx, Float.valueOf(raw)); - break; + return Float.valueOf(raw); case DOUBLE: - row.addField(idx, Double.valueOf(raw)); - break; + return Double.valueOf(raw); default: throw new UnsupportedOperationException( String.format("Column type %s is not supported yet!", columnType)); @@ -112,13 +106,12 @@ public static void addFieldWithAutoTypeCasting(BeamRecord row, int idx, Object r } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) { // convert NlsString to String if (rawObj instanceof NlsString) { - row.addField(idx, ((NlsString) rawObj).getValue()); + return ((NlsString) rawObj).getValue(); } else { - row.addField(idx, rawObj); + return rawObj; } } else { - // keep the origin - row.addField(idx, rawObj); + return rawObj; } } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index 71278ecc090e7..19ca3980813ba 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -57,9 +57,7 @@ private void runAggregationWithoutWindow(PCollection input) throws E BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamRecord record = new BeamRecord(resultType); - record.addField("f_int2", 0); - record.addField("size", 4L); + BeamRecord record = new BeamRecord(resultType, 0, 4L); PAssert.that(result).containsInAnyOrder(record); @@ -107,37 +105,14 @@ private void runAggregationFunctions(PCollection input) throws Excep Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.TIMESTAMP, Types.TIMESTAMP)); - BeamRecord record = new BeamRecord(resultType); - record.addField("f_int2", 0); - record.addField("size", 4L); - - record.addField("sum1", 10000L); - record.addField("avg1", 2500L); - record.addField("max1", 4000L); - record.addField("min1", 1000L); - - record.addField("sum2", (short) 10); - record.addField("avg2", (short) 2); - record.addField("max2", (short) 4); - record.addField("min2", (short) 1); - - record.addField("sum3", (byte) 10); - record.addField("avg3", (byte) 2); - record.addField("max3", (byte) 4); - record.addField("min3", (byte) 1); - - record.addField("sum4", 10.0F); - record.addField("avg4", 2.5F); - record.addField("max4", 4.0F); - record.addField("min4", 1.0F); - - record.addField("sum5", 10.0); - record.addField("avg5", 2.5); - record.addField("max5", 4.0); - record.addField("min5", 1.0); - - record.addField("max6", FORMAT.parse("2017-01-01 02:04:03")); - record.addField("min6", FORMAT.parse("2017-01-01 01:01:03")); + BeamRecord record = new BeamRecord(resultType + , 0, 4L + , 10000L, 2500L, 4000L, 1000L + , (short) 10, (short) 2, (short) 4, (short) 1 + , (byte) 10, (byte) 2, (byte) 4, (byte) 1 + , 10.0F, 2.5F, 4.0F, 1.0F + , 10.0, 2.5, 4.0, 1.0 + , FORMAT.parse("2017-01-01 02:04:03"), FORMAT.parse("2017-01-01 01:01:03")); PAssert.that(result).containsInAnyOrder(record); @@ -169,21 +144,10 @@ private void runDistinct(PCollection input) throws Exception { BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamRecord record1 = new BeamRecord(resultType); - record1.addField("f_int", 1); - record1.addField("f_long", 1000L); - - BeamRecord record2 = new BeamRecord(resultType); - record2.addField("f_int", 2); - record2.addField("f_long", 2000L); - - BeamRecord record3 = new BeamRecord(resultType); - record3.addField("f_int", 3); - record3.addField("f_long", 3000L); - - BeamRecord record4 = new BeamRecord(resultType); - record4.addField("f_int", 4); - record4.addField("f_long", 4000L); + BeamRecord record1 = new BeamRecord(resultType, 1, 1000L); + BeamRecord record2 = new BeamRecord(resultType, 2, 2000L); + BeamRecord record3 = new BeamRecord(resultType, 3, 3000L); + BeamRecord record4 = new BeamRecord(resultType, 4, 4000L); PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); @@ -219,15 +183,8 @@ private void runTumbleWindow(PCollection input) throws Exception { Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - BeamRecord record1 = new BeamRecord(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); - - BeamRecord record2 = new BeamRecord(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 1L); - record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); + BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:00:00")); + BeamRecord record2 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:00:00")); PAssert.that(result).containsInAnyOrder(record1, record2); @@ -262,25 +219,10 @@ private void runHopWindow(PCollection input) throws Exception { Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - BeamRecord record1 = new BeamRecord(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00")); - - BeamRecord record2 = new BeamRecord(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 3L); - record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); - - BeamRecord record3 = new BeamRecord(resultType); - record3.addField("f_int2", 0); - record3.addField("size", 1L); - record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00")); - - BeamRecord record4 = new BeamRecord(resultType); - record4.addField("f_int2", 0); - record4.addField("size", 1L); - record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); + BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 00:30:00")); + BeamRecord record2 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:00:00")); + BeamRecord record3 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 01:30:00")); + BeamRecord record4 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:00:00")); PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); @@ -316,15 +258,8 @@ private void runSessionWindow(PCollection input) throws Exception { Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); - BeamRecord record1 = new BeamRecord(resultType); - record1.addField("f_int2", 0); - record1.addField("size", 3L); - record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03")); - - BeamRecord record2 = new BeamRecord(resultType); - record2.addField("f_int2", 0); - record2.addField("size", 1L); - record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03")); + BeamRecord record1 = new BeamRecord(resultType, 0, 3L, FORMAT.parse("2017-01-01 01:01:03")); + BeamRecord record2 = new BeamRecord(resultType, 0, 1L, FORMAT.parse("2017-01-01 02:04:03")); PAssert.that(result).containsInAnyOrder(record1, record2); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java index d09caf04f73ba..02427ae4db2fe 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -112,56 +112,24 @@ private PCollection prepareUnboundedPCollection2() { private static List prepareInputRowsInTableA() throws ParseException{ List rows = new ArrayList<>(); - BeamRecord row1 = new BeamRecord(rowTypeInTableA); - row1.addField(0, 1); - row1.addField(1, 1000L); - row1.addField(2, Short.valueOf("1")); - row1.addField(3, Byte.valueOf("1")); - row1.addField(4, 1.0f); - row1.addField(5, 1.0); - row1.addField(6, "string_row1"); - row1.addField(7, FORMAT.parse("2017-01-01 01:01:03")); - row1.addField(8, 0); - row1.addField(9, new BigDecimal(1)); + BeamRecord row1 = new BeamRecord(rowTypeInTableA + , 1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0f, 1.0, "string_row1" + , FORMAT.parse("2017-01-01 01:01:03"), 0, new BigDecimal(1)); rows.add(row1); - BeamRecord row2 = new BeamRecord(rowTypeInTableA); - row2.addField(0, 2); - row2.addField(1, 2000L); - row2.addField(2, Short.valueOf("2")); - row2.addField(3, Byte.valueOf("2")); - row2.addField(4, 2.0f); - row2.addField(5, 2.0); - row2.addField(6, "string_row2"); - row2.addField(7, FORMAT.parse("2017-01-01 01:02:03")); - row2.addField(8, 0); - row2.addField(9, new BigDecimal(2)); + BeamRecord row2 = new BeamRecord(rowTypeInTableA + , 2, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0f, 2.0, "string_row2" + , FORMAT.parse("2017-01-01 01:02:03"), 0, new BigDecimal(2)); rows.add(row2); - BeamRecord row3 = new BeamRecord(rowTypeInTableA); - row3.addField(0, 3); - row3.addField(1, 3000L); - row3.addField(2, Short.valueOf("3")); - row3.addField(3, Byte.valueOf("3")); - row3.addField(4, 3.0f); - row3.addField(5, 3.0); - row3.addField(6, "string_row3"); - row3.addField(7, FORMAT.parse("2017-01-01 01:06:03")); - row3.addField(8, 0); - row3.addField(9, new BigDecimal(3)); + BeamRecord row3 = new BeamRecord(rowTypeInTableA + , 3, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0f, 3.0, "string_row3" + , FORMAT.parse("2017-01-01 01:06:03"), 0, new BigDecimal(3)); rows.add(row3); - BeamRecord row4 = new BeamRecord(rowTypeInTableA); - row4.addField(0, 4); - row4.addField(1, 4000L); - row4.addField(2, Short.valueOf("4")); - row4.addField(3, Byte.valueOf("4")); - row4.addField(4, 4.0f); - row4.addField(5, 4.0); - row4.addField(6, "string_row4"); - row4.addField(7, FORMAT.parse("2017-01-01 02:04:03")); - row4.addField(8, 0); - row4.addField(9, new BigDecimal(4)); + BeamRecord row4 = new BeamRecord(rowTypeInTableA + , 4, 4000L, Short.valueOf("4"), Byte.valueOf("4"), 4.0f, 4.0, "string_row4" + , FORMAT.parse("2017-01-01 02:04:03"), 0, new BigDecimal(4)); rows.add(row4); return rows; diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java index ddb90d5bbf1a0..c8041a8259f2b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java @@ -84,9 +84,8 @@ private void runPartialFields(PCollection input) throws Exception { BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamRecord record = new BeamRecord(resultType); - record.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + BeamRecord record = new BeamRecord(resultType + , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1)); PAssert.that(result).containsInAnyOrder(record); @@ -119,21 +118,17 @@ private void runPartialFieldsInMultipleRow(PCollection input) throws BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamRecord record1 = new BeamRecord(resultType); - record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + BeamRecord record1 = new BeamRecord(resultType + , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1)); - BeamRecord record2 = new BeamRecord(resultType); - record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); - record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); + BeamRecord record2 = new BeamRecord(resultType + , recordsInTableA.get(1).getFieldValue(0), recordsInTableA.get(1).getFieldValue(1)); - BeamRecord record3 = new BeamRecord(resultType); - record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); - record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); + BeamRecord record3 = new BeamRecord(resultType + , recordsInTableA.get(2).getFieldValue(0), recordsInTableA.get(2).getFieldValue(1)); - BeamRecord record4 = new BeamRecord(resultType); - record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); - record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); + BeamRecord record4 = new BeamRecord(resultType + , recordsInTableA.get(3).getFieldValue(0), recordsInTableA.get(3).getFieldValue(1)); PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); @@ -166,21 +161,17 @@ private void runPartialFieldsInRows(PCollection input) throws Except BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); - BeamRecord record1 = new BeamRecord(resultType); - record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + BeamRecord record1 = new BeamRecord(resultType + , recordsInTableA.get(0).getFieldValue(0), recordsInTableA.get(0).getFieldValue(1)); - BeamRecord record2 = new BeamRecord(resultType); - record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); - record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); + BeamRecord record2 = new BeamRecord(resultType + , recordsInTableA.get(1).getFieldValue(0), recordsInTableA.get(1).getFieldValue(1)); - BeamRecord record3 = new BeamRecord(resultType); - record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); - record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); + BeamRecord record3 = new BeamRecord(resultType + , recordsInTableA.get(2).getFieldValue(0), recordsInTableA.get(2).getFieldValue(1)); - BeamRecord record4 = new BeamRecord(resultType); - record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); - record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); + BeamRecord record4 = new BeamRecord(resultType + , recordsInTableA.get(3).getFieldValue(0), recordsInTableA.get(3).getFieldValue(1)); PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); @@ -213,8 +204,7 @@ public void runLiteralField(PCollection input) throws Exception { BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"), Arrays.asList(Types.INTEGER)); - BeamRecord record = new BeamRecord(resultType); - record.addField("literal_field", 1); + BeamRecord record = new BeamRecord(resultType, 1); PAssert.that(result).containsInAnyOrder(record); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index e3c6aecf35f02..25e76e9ef0e66 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -42,9 +42,7 @@ public void testUdaf() throws Exception { BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"), Arrays.asList(Types.INTEGER, Types.INTEGER)); - BeamRecord record = new BeamRecord(resultType); - record.addField("f_int2", 0); - record.addField("squaresum", 30); + BeamRecord record = new BeamRecord(resultType, 0, 30); String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`" + " FROM PCOLLECTION GROUP BY f_int2"; @@ -72,9 +70,7 @@ public void testUdf() throws Exception{ BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"), Arrays.asList(Types.INTEGER, Types.INTEGER)); - BeamRecord record = new BeamRecord(resultType); - record.addField("f_int", 2); - record.addField("cubicvalue", 8); + BeamRecord record = new BeamRecord(resultType, 2, 8); String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; PCollection result1 = diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index 63b6ca82a5545..e9dc88f1a8721 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -184,11 +184,7 @@ public static List buildRows(BeamSqlRecordType type, List args) { int fieldCount = type.size(); for (int i = 0; i < args.size(); i += fieldCount) { - BeamRecord row = new BeamRecord(type); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args.get(i + j)); - } - rows.add(row); + rows.add(new BeamRecord(type, args.subList(i, i + fieldCount))); } return rows; } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java index 4da77904b6a86..86e2ca442399f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutorTestBase.java @@ -71,12 +71,8 @@ public static void prepare() { .add("order_time", SqlTypeName.BIGINT).build(); beamRowType = CalciteUtils.toBeamRowType(relDataType); - record = new BeamRecord(beamRowType); - - record.addField(0, 1234567L); - record.addField(1, 0); - record.addField(2, 8.9); - record.addField(3, 1234567L); + record = new BeamRecord(beamRowType + , 1234567L, 0, 8.9, 1234567L); SchemaPlus schema = Frameworks.createRootSchema(true); final List traitDefs = new ArrayList<>(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java index 08f98c345d741..7492434cc46b9 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoderTest.java @@ -62,20 +62,12 @@ public RelDataType apply(RelDataTypeFactory a0) { BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType( protoRowType.apply(new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT))); - BeamRecord row = new BeamRecord(beamSQLRowType); - row.addField("col_tinyint", Byte.valueOf("1")); - row.addField("col_smallint", Short.valueOf("1")); - row.addField("col_integer", 1); - row.addField("col_bigint", 1L); - row.addField("col_float", 1.1F); - row.addField("col_double", 1.1); - row.addField("col_decimal", BigDecimal.ZERO); - row.addField("col_string_varchar", "hello"); + GregorianCalendar calendar = new GregorianCalendar(); calendar.setTime(new Date()); - row.addField("col_time", calendar); - row.addField("col_timestamp", new Date()); - row.addField("col_boolean", true); + BeamRecord row = new BeamRecord(beamSQLRowType + , Byte.valueOf("1"), Short.valueOf("1"), 1, 1L, 1.1F, 1.1 + , BigDecimal.ZERO, "hello", calendar, new Date(), true); BeamRecordCoder coder = beamSQLRowType.getRecordCoder(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java index 2fc013d3d8d7d..cb6121a75788e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTableTest.java @@ -45,18 +45,14 @@ public class BeamKafkaCSVTableTest { @Rule public TestPipeline pipeline = TestPipeline.create(); - public static BeamRecord row1 = new BeamRecord(genRowType()); - public static BeamRecord row2 = new BeamRecord(genRowType()); + public static BeamRecord row1; + public static BeamRecord row2; @BeforeClass public static void setUp() { - row1.addField(0, 1L); - row1.addField(1, 1); - row1.addField(2, 1.0); + row1 = new BeamRecord(genRowType(), 1L, 1, 1.0); - row2.addField(0, 2L); - row2.addField(1, 2); - row2.addField(2, 2.0); + row2 = new BeamRecord(genRowType(), 2L, 2, 2.0); } @Test public void testCsvRecorderDecoder() throws Exception { From 86dee72eec73049f7a5954604fa7bb4888082aaf Mon Sep 17 00:00:00 2001 From: mingmxu Date: Tue, 8 Aug 2017 14:38:03 -0700 Subject: [PATCH 2/2] fixup --- .../main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java | 2 +- .../src/main/java/org/apache/beam/sdk/values/BeamRecord.java | 2 +- .../apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java | 2 +- .../apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java index 6e1b8b6ffca28..4e24b82e6ac86 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -70,7 +70,7 @@ public void encode(BeamRecord value, OutputStream outStream) public BeamRecord decode(InputStream inStream) throws CoderException, IOException { BitSet nullFields = nullListCoder.decode(inStream); - List fieldValues = new ArrayList<>(); + List fieldValues = new ArrayList<>(recordType.size()); for (int idx = 0; idx < recordType.size(); ++idx) { if (nullFields.get(idx)) { fieldValues.add(null); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index d4d94a08b39fb..6e4bd4cd2ac2b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -39,7 +39,7 @@ public class BeamRecord implements Serializable { public BeamRecord(BeamRecordType dataType, List rawdataValues) { this.dataType = dataType; - this.dataValues = new ArrayList<>(); + this.dataValues = new ArrayList<>(dataType.size()); for (int idx = 0; idx < dataType.size(); ++idx) { dataValues.add(null); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index 57711b3abd407..1d666cab5b82d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -65,7 +65,7 @@ public BeamValuesRel( BeamSqlRecordType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); for (ImmutableList tuple : tuples) { - List fieldsValue = new ArrayList<>(); + List fieldsValue = new ArrayList<>(beamSQLRowType.size()); for (int i = 0; i < tuple.size(); i++) { fieldsValue.add(BeamTableUtils.autoCastField( beamSQLRowType.getFieldsType().get(i), tuple.get(i).getValue())); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java index 4029e237dec0e..19d3e3982023a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java @@ -41,7 +41,7 @@ public static BeamRecord csvLine2BeamSqlRow( CSVFormat csvFormat, String line, BeamSqlRecordType beamSqlRowType) { - List fieldsValue = new ArrayList<>(); + List fieldsValue = new ArrayList<>(beamSqlRowType.size()); try (StringReader reader = new StringReader(line)) { CSVParser parser = csvFormat.parse(reader); CSVRecord rawRecord = parser.getRecords().get(0);