From df4db0980b01662882e6df89b50b96a58d2b1e6f Mon Sep 17 00:00:00 2001 From: mingmxu Date: Wed, 9 Aug 2017 13:30:16 -0700 Subject: [PATCH] update JavaDoc for BeamRecord, BeamRecordType, quickfix for BeamSqlUdfExpression fixup, ImmutableList cannot accept NULL values. --- .../apache/beam/sdk/values/BeamRecord.java | 118 +++++++++++++++++- .../beam/sdk/values/BeamRecordType.java | 24 +++- .../operator/BeamSqlUdfExpression.java | 5 +- 3 files changed, 141 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index a3ede3cc32731..c29b89c14049d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -25,11 +25,17 @@ import java.util.GregorianCalendar; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.BeamRecordCoder; /** - * {@link org.apache.beam.sdk.values.BeamRecord}, self-described with - * {@link BeamRecordType}, represents one element in a - * {@link org.apache.beam.sdk.values.PCollection}. + * {@link BeamRecord} is an immutable tuple-like type to represent one element in a + * {@link PCollection}. The fields are described with a {@link BeamRecordType}. + * + *

By default, {@link BeamRecordType} only contains the name for each field. It + * can be extended to support more sophisticated validation by overwriting + * {@link BeamRecordType#validateValueType(int, Object)}. + * + *

A Coder {@link BeamRecordCoder} is provided, which wraps the Coder for each data field. */ @Experimental public class BeamRecord implements Serializable { @@ -63,6 +69,9 @@ public BeamRecord(BeamRecordType dataType, List rawDataValues) { } } + /** + * see {@link #BeamRecord(BeamRecordType, List)}. + */ public BeamRecord(BeamRecordType dataType, Object... rawdataValues) { this(dataType, Arrays.asList(rawdataValues)); } @@ -72,110 +81,213 @@ private void addField(int index, Object fieldValue) { dataValues.set(index, fieldValue); } + /** + * Get value by field name. + */ public Object getFieldValue(String fieldName) { return getFieldValue(dataType.getFieldNames().indexOf(fieldName)); } + /** + * Get a {@link Byte} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Byte getByte(String fieldName) { return (Byte) getFieldValue(fieldName); } + /** + * Get a {@link Short} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Short getShort(String fieldName) { return (Short) getFieldValue(fieldName); } + /** + * Get a {@link Integer} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Integer getInteger(String fieldName) { return (Integer) getFieldValue(fieldName); } + /** + * Get a {@link Float} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Float getFloat(String fieldName) { return (Float) getFieldValue(fieldName); } + /** + * Get a {@link Double} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Double getDouble(String fieldName) { return (Double) getFieldValue(fieldName); } + /** + * Get a {@link Long} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Long getLong(String fieldName) { return (Long) getFieldValue(fieldName); } + /** + * Get a {@link String} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public String getString(String fieldName) { return (String) getFieldValue(fieldName); } + /** + * Get a {@link Date} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Date getDate(String fieldName) { return (Date) getFieldValue(fieldName); } + /** + * Get a {@link GregorianCalendar} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public GregorianCalendar getGregorianCalendar(String fieldName) { return (GregorianCalendar) getFieldValue(fieldName); } + /** + * Get a {@link BigDecimal} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public BigDecimal getBigDecimal(String fieldName) { return (BigDecimal) getFieldValue(fieldName); } + /** + * Get a {@link Boolean} value by field name, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Boolean getBoolean(String fieldName) { return (Boolean) getFieldValue(fieldName); } + /** + * Get value by field index. + */ public Object getFieldValue(int fieldIdx) { return dataValues.get(fieldIdx); } + /** + * Get a {@link Byte} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Byte getByte(int idx) { return (Byte) getFieldValue(idx); } + /** + * Get a {@link Short} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Short getShort(int idx) { return (Short) getFieldValue(idx); } + /** + * Get a {@link Integer} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Integer getInteger(int idx) { return (Integer) getFieldValue(idx); } + /** + * Get a {@link Float} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Float getFloat(int idx) { return (Float) getFieldValue(idx); } + /** + * Get a {@link Double} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Double getDouble(int idx) { return (Double) getFieldValue(idx); } + /** + * Get a {@link Long} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Long getLong(int idx) { return (Long) getFieldValue(idx); } + /** + * Get a {@link String} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public String getString(int idx) { return (String) getFieldValue(idx); } + /** + * Get a {@link Date} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Date getDate(int idx) { return (Date) getFieldValue(idx); } + /** + * Get a {@link GregorianCalendar} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public GregorianCalendar getGregorianCalendar(int idx) { return (GregorianCalendar) getFieldValue(idx); } + /** + * Get a {@link BigDecimal} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public BigDecimal getBigDecimal(int idx) { return (BigDecimal) getFieldValue(idx); } + /** + * Get a {@link Boolean} value by field index, {@link ClassCastException} is thrown + * if type doesn't match. + */ public Boolean getBoolean(int idx) { return (Boolean) getFieldValue(idx); } + /** + * Return the size of data fields. + */ public int size() { return dataValues.size(); } + /** + * Return the list of data values. + */ public List getDataValues() { return dataValues; } + /** + * Return {@link BeamRecordType} which describes the fields. + */ public BeamRecordType getDataType() { return dataType; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java index 6ab783cc1c229..101beaca80892 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java @@ -25,14 +25,22 @@ import org.apache.beam.sdk.coders.Coder; /** - * The default type provider used in {@link BeamRecord}. + * {@link BeamRecordType} describes the fields in {@link BeamRecord}, extra checking can be added + * by overwriting {@link BeamRecordType#validateValueType(int, Object)}. */ @Experimental public class BeamRecordType implements Serializable{ private List fieldNames; private List fieldCoders; + /** + * Create a {@link BeamRecordType} with a name and Coder for each field. + */ public BeamRecordType(List fieldNames, List fieldCoders) { + if (fieldNames.size() != fieldCoders.size()) { + throw new IllegalStateException( + "the size of fieldNames and fieldCoders need to be the same."); + } this.fieldNames = fieldNames; this.fieldCoders = fieldCoders; } @@ -47,24 +55,36 @@ public void validateValueType(int index, Object fieldValue) } /** - * Get the coder for {@link BeamRecordCoder}. + * Return the coder for {@link BeamRecord}, which wraps {@link #fieldCoders} for each field. */ public BeamRecordCoder getRecordCoder(){ return BeamRecordCoder.of(this, fieldCoders); } + /** + * Returns an immutable list of field names. + */ public List getFieldNames(){ return ImmutableList.copyOf(fieldNames); } + /** + * Return the name of field by index. + */ public String getFieldNameByIndex(int index){ return fieldNames.get(index); } + /** + * Find the index of a given field. + */ public int findIndexOfField(String fieldName){ return fieldNames.indexOf(fieldName); } + /** + * Return the count of fields. + */ public int size(){ return fieldNames.size(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java index 123e6a0a73efe..625de2c0e7f9f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; import java.lang.reflect.Method; +import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -79,8 +80,10 @@ private void reConstructMethod() { for (String pc : paraClassName) { paraClass.add(Class.forName(pc)); } - udfIns = Class.forName(className).newInstance(); method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class[] {})); + if (!Modifier.isStatic(method.getModifiers())) { + udfIns = Class.forName(className).newInstance(); + } } catch (Exception e) { throw new RuntimeException(e); }