From 8aab4c16ea7cf17ff938b97453bf9be0cdde33e6 Mon Sep 17 00:00:00 2001 From: Gaurangi Saxena Date: Wed, 20 May 2020 15:49:01 -0700 Subject: [PATCH 01/16] move SchemaConverters to scala --- .../spark/bigquery/SchemaConverters.java | 188 ++++++++++++++++++ .../spark/bigquery/AvroBinaryIterator.scala | 7 +- .../spark/bigquery/SchemaConverters.scala | 151 -------------- .../spark/bigquery/direct/BigQueryRDD.scala | 2 +- .../it/SparkBigQueryEndToEndITSuite.scala | 1 - 5 files changed, 194 insertions(+), 155 deletions(-) create mode 100644 connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java delete mode 100644 connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java b/connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java new file mode 100644 index 000000000..471250049 --- /dev/null +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java @@ -0,0 +1,188 @@ +package com.google.cloud.spark.bigquery; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; + +import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.LegacySQLTypeName; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class SchemaConverters { + // Numeric is a fixed precision Decimal Type with 38 digits of precision and 9 digits of scale. + // See https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type + private final static int BQ_NUMERIC_PRECISION = 38; + private final static int BQ_NUMERIC_SCALE = 9; + private final static DecimalType NUMERIC_SPARK_TYPE = DataTypes.createDecimalType( + BQ_NUMERIC_PRECISION, BQ_NUMERIC_SCALE); + + /** Convert a BigQuery schema to a Spark schema */ + public static StructType toSpark(Schema schema) { + List fieldList = schema.getFields().stream() + .map(SchemaConverters::convert).collect(Collectors.toList()); + StructType structType = new StructType(fieldList.toArray(new StructField[0])); + + return structType; + } + + public static InternalRow createRowConverter(Schema schema, List namesInOrder, GenericRecord record) { + return convertAll(schema.getFields(), record, namesInOrder); + } + + static Object convert(Field field, Object value) { + if (value == null) { + return null; + } + + if (field.getMode() == Field.Mode.REPEATED) { + // rather than recurring down we strip off the repeated mode + // Due to serialization issues, reconstruct the type using reflection: + // See: https://github.com/googleapis/google-cloud-java/issues/3942 + LegacySQLTypeName fType = LegacySQLTypeName.valueOfStrict(field.getType().name()); + Field nestedField = Field.newBuilder(field.getName(), fType, field.getSubFields()) + // As long as this is not repeated it works, but technically arrays cannot contain + // nulls, so select required instead of nullable. + .setMode(Field.Mode.REQUIRED) + .build(); + + List valueList = (List) value; + + return new GenericArrayData(valueList.stream().map(v -> convert(nestedField, v)).collect(Collectors.toList())); + } + + if (LegacySQLTypeName.INTEGER.equals(field.getType()) || + LegacySQLTypeName.FLOAT.equals(field.getType()) || + LegacySQLTypeName.BOOLEAN.equals(field.getType()) || + LegacySQLTypeName.DATE.equals(field.getType()) || + LegacySQLTypeName.TIME.equals(field.getType()) || + LegacySQLTypeName.TIMESTAMP.equals(field.getType())) { + return value; + } + + if (LegacySQLTypeName.STRING.equals(field.getType()) || + LegacySQLTypeName.DATETIME.equals(field.getType()) || + LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) { + return UTF8String.fromBytes(((Utf8)value).getBytes()); + } + + if (LegacySQLTypeName.BYTES.equals(field.getType())) { + return getBytes((ByteBuffer)value); + } + + if (LegacySQLTypeName.NUMERIC.equals(field.getType())) { + byte[] bytes = getBytes((ByteBuffer)value); + BigDecimal b = new BigDecimal(new BigInteger(bytes), BQ_NUMERIC_SCALE); + Decimal d = Decimal.apply(b, BQ_NUMERIC_PRECISION, BQ_NUMERIC_SCALE); + + return d; + } + + if (LegacySQLTypeName.RECORD.equals(field.getType())) { + return convertAll(field.getSubFields(), + (GenericRecord)value, + field.getSubFields().stream().map(f -> f.getName()).collect(Collectors.toList())); + } + + throw new IllegalStateException("Unexpected type: " + field.getType()); + } + + private static byte[] getBytes(ByteBuffer buf) { + byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + + return bytes; + } + + // Schema is not recursive so add helper for sequence of fields + static GenericInternalRow convertAll(FieldList fieldList, + GenericRecord record, + List namesInOrder) { + + + Map fieldMap = new HashMap<>(); + + fieldList.stream().forEach(field -> + fieldMap.put(field.getName(), convert(field, record.get(field.getName())))); + + Object[] values = new Object[namesInOrder.size()]; + for (int i = 0; i < namesInOrder.size(); i++) { + values[i] = fieldMap.get(namesInOrder.get(i)); + } + + return new GenericInternalRow(values); + } + + /** + * Create a function that converts an Avro row with the given BigQuery schema to a Spark SQL row + * + * The conversion is based on the BigQuery schema, not Avro Schema, because the Avro schema is + * very painful to use. + * + * Not guaranteed to be stable across all versions of Spark. + */ + + private static StructField convert(Field field) { + DataType dataType = getDataType(field); + boolean nullable = true; + + if (field.getMode() == Field.Mode.REQUIRED) { + nullable = false; + } else if (field.getMode() == Field.Mode.REPEATED) { + dataType = new ArrayType(dataType, true); + } + + MetadataBuilder metadata = new MetadataBuilder(); + if (field.getDescription() != null) { + metadata.putString("description", field.getDescription()); + } + + return new StructField(field.getName(), dataType, nullable, metadata.build()); + } + + private static DataType getDataType(Field field) { + + if (LegacySQLTypeName.INTEGER.equals(field.getType())) { + return DataTypes.LongType; + } else if (LegacySQLTypeName.FLOAT.equals(field.getType())) { + return DataTypes.DoubleType; + } else if (LegacySQLTypeName.NUMERIC.equals(field.getType())) { + return NUMERIC_SPARK_TYPE; + } else if (LegacySQLTypeName.STRING.equals(field.getType())) { + return DataTypes.StringType; + } else if (LegacySQLTypeName.BOOLEAN.equals(field.getType())) { + return DataTypes.BooleanType; + } else if (LegacySQLTypeName.BYTES.equals(field.getType())) { + return DataTypes.BinaryType; + } else if (LegacySQLTypeName.DATE.equals(field.getType())) { + return DataTypes.DateType; + } else if (LegacySQLTypeName.TIMESTAMP.equals(field.getType())) { + return DataTypes.TimestampType; + } else if (LegacySQLTypeName.TIME.equals(field.getType())) { + return DataTypes.LongType; + // TODO(#5): add a timezone to allow parsing to timestamp + // This can be safely cast to TimestampType, but doing so causes the date to be inferred + // as the current date. It's safer to leave as a stable string and give the user the + // option of casting themselves. + } else if (LegacySQLTypeName.DATETIME.equals(field.getType())) { + return DataTypes.StringType; + } else if (LegacySQLTypeName.RECORD.equals(field.getType())) { + List structFields = field.getSubFields().stream().map(SchemaConverters::convert).collect(Collectors.toList()); + return new StructType(structFields.toArray(new StructField[0])); + } else if (LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) { + return DataTypes.StringType; + } else { + throw new IllegalStateException("Unexpected type: " + field.getType()); + } + } +} diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala index 99e0c6336..b49505223 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala @@ -22,6 +22,8 @@ import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.avro.{Schema => AvroSchema} import org.apache.spark.sql.catalyst.InternalRow +import scala.collection.JavaConverters + /** * An iterator for scanning over rows serialized in Avro format * @param bqSchema Schema of underlying BigQuery source @@ -34,11 +36,12 @@ class AvroBinaryIterator(bqSchema: Schema, schema: AvroSchema, rowsInBytes: ByteString) extends Iterator[InternalRow] { - private lazy val converter = SchemaConverters.createRowConverter(bqSchema, columnsInOrder) _ val reader = new GenericDatumReader[GenericRecord](schema) + val columnsInOrderList = JavaConverters.seqAsJavaListConverter(columnsInOrder).asJava val in: BinaryDecoder = new DecoderFactory().binaryDecoder(rowsInBytes.toByteArray, null) override def hasNext: Boolean = !in.isEnd - override def next(): InternalRow = converter(reader.read(null, in)) + override def next(): InternalRow = SchemaConverters.createRowConverter(bqSchema, + columnsInOrderList, reader.read(null, in)) } diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala deleted file mode 100644 index cf657246e..000000000 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2018 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.spark.bigquery - -import java.nio.ByteBuffer - -import com.google.cloud.bigquery.LegacySQLTypeName._ -import com.google.cloud.bigquery.{Field, LegacySQLTypeName, Schema} -import org.apache.avro.generic.GenericRecord -import org.apache.avro.util.Utf8 -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.GenericArrayData -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -import scala.collection.JavaConverters._ - -/** Stateless converters for Converting between Spark and BigQuery types. */ -object SchemaConverters extends Logging { - - // Numeric is a fixed precision Decimal Type with 38 digits of precision and 9 digits of scale. - // See https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type - private val BQ_NUMERIC_PRECISION = 38 - private val BQ_NUMERIC_SCALE = 9 - private lazy val NUMERIC_SPARK_TYPE = DataTypes.createDecimalType( - BQ_NUMERIC_PRECISION, BQ_NUMERIC_SCALE) - - /** Convert a BigQuery schema to a Spark schema */ - def toSpark(schema: Schema): StructType = { - def convert(field: Field): StructField = { - var dataType = field.getType match { - // TODO(#1): Support NUMERIC - case INTEGER => LongType - case FLOAT => DoubleType - case NUMERIC => NUMERIC_SPARK_TYPE - case STRING => StringType - case BOOLEAN => BooleanType - case BYTES => BinaryType - case DATE => DateType - case TIMESTAMP => TimestampType - case TIME => LongType - // TODO(#5): add a timezone to allow parsing to timestamp - // This can be safely cast to TimestampType, but doing so causes the date to be inferred - // as the current date. It's safer to leave as a stable string and give the user the - // option of casting themselves. - case DATETIME => StringType - case RECORD => StructType(field.getSubFields.asScala.map(convert)) - case GEOGRAPHY => StringType - case other => throw new IllegalArgumentException(s"Unsupported type '$other'") - } - var nullable = true - Option(field.getMode) match { - case Some(Field.Mode.REQUIRED) => nullable = false - case Some(Field.Mode.REPEATED) => dataType = ArrayType(dataType, containsNull = true) - case _ => () // nullable field - } - val metadata = new MetadataBuilder - if (field.getDescription != null) { - metadata.putString("description", field.getDescription) - } - StructField(field.getName, dataType, nullable, metadata.build) - } - - - val res = StructType(schema.getFields.asScala.map(convert)) - logDebug(s"BQ Schema:\n'${schema.toString}'\n\nSpark schema:\n${res.treeString}") - res - } - - - /** - * Create a function that converts an Avro row with the given BigQuery schema to a Spark SQL row - * - * The conversion is based on the BigQuery schema, not Avro Schema, because the Avro schema is - * very painful to use. - * - * Not guaranteed to be stable across all versions of Spark. - */ - def createRowConverter(schema: Schema, namesInOrder: Seq[String])(record: GenericRecord) - : InternalRow = { - def convert(field: Field, value: Any): Any = { - if (value == null) { - return null - } - if (field.getMode == Field.Mode.REPEATED) { - // rather than recurring down we strip off the repeated mode - // Due to serialization issues, reconstruct the type using reflection: - // See: https://github.com/googleapis/google-cloud-java/issues/3942 - val fType = LegacySQLTypeName.valueOfStrict(field.getType.name) - val nestedField = Field.newBuilder(field.getName, fType, field.getSubFields) - // As long as this is not repeated it works, but technically arrays cannot contain - // nulls, so select required instead of nullable. - .setMode(Field.Mode.REQUIRED) - .build - return new GenericArrayData( - value.asInstanceOf[java.lang.Iterable[AnyRef]].asScala - .map(v => convert(nestedField, v))) - } - field.getType match { - case INTEGER | FLOAT | BOOLEAN | DATE | TIME | TIMESTAMP => value - // TODO(pmkc): use String for safety? - case STRING | DATETIME | GEOGRAPHY => - UTF8String.fromBytes(value.asInstanceOf[Utf8].getBytes) - case BYTES => getBytes(value.asInstanceOf[ByteBuffer]) - case NUMERIC => - val bytes = getBytes(value.asInstanceOf[ByteBuffer]) - Decimal(BigDecimal(BigInt(bytes), BQ_NUMERIC_SCALE), BQ_NUMERIC_PRECISION, - BQ_NUMERIC_SCALE) - case RECORD => - val fields = field.getSubFields.asScala - convertAll(fields, value.asInstanceOf[GenericRecord], fields.map(_.getName)) - case other => throw new IllegalArgumentException(s"Unsupported type '$other'") - } - } - - def getBytes(buf: ByteBuffer) = { - val bytes = new Array[Byte](buf.remaining) - buf.get(bytes) - bytes - } - - // Schema is not recursive so add helper for sequence of fields - def convertAll(fields: Seq[Field], - record: GenericRecord, - namesInOrder: Seq[String]): GenericInternalRow = { - val getValue = fields.zip(Range(0, record.getSchema.getFields.size()).map(record.get)) - .map { case (field, value) => field.getName -> convert(field, value) } - .toMap - new GenericInternalRow(namesInOrder.map(getValue).toArray) - } - - // Output in the order Spark expects - convertAll(schema.getFields.asScala, record, namesInOrder) - } -} - diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala index c174c1951..312249a45 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala @@ -18,7 +18,7 @@ package com.google.cloud.spark.bigquery.direct import com.google.api.gax.rpc.ServerStreamingCallable import com.google.cloud.bigquery.storage.v1.{BigQueryReadClient, DataFormat, ReadRowsRequest, ReadRowsResponse, ReadSession, ReadStream} import com.google.cloud.bigquery.{BigQuery, Schema} -import com.google.cloud.spark.bigquery.{ArrowBinaryIterator, AvroBinaryIterator, BigQueryUtil, SchemaConverters, SparkBigQueryOptions} +import com.google.cloud.spark.bigquery.{ArrowBinaryIterator, AvroBinaryIterator, BigQueryUtil, SparkBigQueryOptions} import com.google.protobuf.ByteString import org.apache.avro.{Schema => AvroSchema} import org.apache.spark.internal.Logging diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala index 473eb0e63..82654466f 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala @@ -301,7 +301,6 @@ class SparkBigQueryEndToEndITSuite extends FunSuite row should contain theSameElementsInOrderAs expectedRow } - test("known size in bytes") { val actualTableSize = allTypesTable.queryExecution.analyzed.stats.sizeInBytes assert(actualTableSize == ALL_TYPES_TABLE_SIZE) From 4efed2d170ee55b2e685de20543814c9d399e25d Mon Sep 17 00:00:00 2001 From: Gaurangi Saxena Date: Wed, 20 May 2020 15:49:01 -0700 Subject: [PATCH 02/16] move SchemaConverters to java --- .../spark/bigquery/SchemaConverters.java | 189 ++++++++++++++++++ .../spark/bigquery/AvroBinaryIterator.scala | 7 +- .../spark/bigquery/SchemaConverters.scala | 151 -------------- .../spark/bigquery/direct/BigQueryRDD.scala | 2 +- 4 files changed, 195 insertions(+), 154 deletions(-) create mode 100644 connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java delete mode 100644 connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java b/connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java new file mode 100644 index 000000000..7da10746f --- /dev/null +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/SchemaConverters.java @@ -0,0 +1,189 @@ +package com.google.cloud.spark.bigquery; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.LegacySQLTypeName; +import org.apache.avro.generic.GenericRecord; +import com.google.cloud.bigquery.Schema; +import org.apache.avro.util.Utf8; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +public class SchemaConverters { + // Numeric is a fixed precision Decimal Type with 38 digits of precision and 9 digits of scale. + // See https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type + private final static int BQ_NUMERIC_PRECISION = 38; + private final static int BQ_NUMERIC_SCALE = 9; + private final static DecimalType NUMERIC_SPARK_TYPE = DataTypes.createDecimalType( + BQ_NUMERIC_PRECISION, BQ_NUMERIC_SCALE); + + /** Convert a BigQuery schema to a Spark schema */ + public static StructType toSpark(Schema schema) { + List fieldList = schema.getFields().stream() + .map(SchemaConverters::convert).collect(Collectors.toList()); + StructType structType = new StructType(fieldList.toArray(new StructField[0])); + + return structType; + } + + public static InternalRow createRowConverter(Schema schema, List namesInOrder, GenericRecord record) { + return convertAll(schema.getFields(), record, namesInOrder); + } + + static Object convert(Field field, Object value) { + if (value == null) { + return null; + } + + if (field.getMode() == Field.Mode.REPEATED) { + // rather than recurring down we strip off the repeated mode + // Due to serialization issues, reconstruct the type using reflection: + // See: https://github.com/googleapis/google-cloud-java/issues/3942 + LegacySQLTypeName fType = LegacySQLTypeName.valueOfStrict(field.getType().name()); + Field nestedField = Field.newBuilder(field.getName(), fType, field.getSubFields()) + // As long as this is not repeated it works, but technically arrays cannot contain + // nulls, so select required instead of nullable. + .setMode(Field.Mode.REQUIRED) + .build(); + + List valueList = (List) value; + + return new GenericArrayData(valueList.stream().map(v -> convert(nestedField, v)).collect(Collectors.toList())); + } + + if (LegacySQLTypeName.INTEGER.equals(field.getType()) || + LegacySQLTypeName.FLOAT.equals(field.getType()) || + LegacySQLTypeName.BOOLEAN.equals(field.getType()) || + LegacySQLTypeName.DATE.equals(field.getType()) || + LegacySQLTypeName.TIME.equals(field.getType()) || + LegacySQLTypeName.TIMESTAMP.equals(field.getType())) { + return value; + } + + if (LegacySQLTypeName.STRING.equals(field.getType()) || + LegacySQLTypeName.DATETIME.equals(field.getType()) || + LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) { + return UTF8String.fromBytes(((Utf8)value).getBytes()); + } + + if (LegacySQLTypeName.BYTES.equals(field.getType())) { + return getBytes((ByteBuffer)value); + } + + if (LegacySQLTypeName.NUMERIC.equals(field.getType())) { + byte[] bytes = getBytes((ByteBuffer)value); + BigDecimal b = new BigDecimal(new BigInteger(bytes), BQ_NUMERIC_SCALE); + Decimal d = Decimal.apply(b, BQ_NUMERIC_PRECISION, BQ_NUMERIC_SCALE); + + return d; + } + + if (LegacySQLTypeName.RECORD.equals(field.getType())) { + return convertAll(field.getSubFields(), + (GenericRecord)value, + field.getSubFields().stream().map(f -> f.getName()).collect(Collectors.toList())); + } + + throw new IllegalStateException("Unexpected type: " + field.getType()); + } + + private static byte[] getBytes(ByteBuffer buf) { + byte[] bytes = new byte[buf.remaining()]; + buf.get(bytes); + + return bytes; + } + + // Schema is not recursive so add helper for sequence of fields + static GenericInternalRow convertAll(FieldList fieldList, + GenericRecord record, + List namesInOrder) { + + Map fieldMap = new HashMap<>(); + + fieldList.stream().forEach(field -> + fieldMap.put(field.getName(), convert(field, record.get(field.getName())))); + + Object[] values = new Object[namesInOrder.size()]; + for (int i = 0; i < namesInOrder.size(); i++) { + values[i] = fieldMap.get(namesInOrder.get(i)); + } + + return new GenericInternalRow(values); + } + + /** + * Create a function that converts an Avro row with the given BigQuery schema to a Spark SQL row + * + * The conversion is based on the BigQuery schema, not Avro Schema, because the Avro schema is + * very painful to use. + * + * Not guaranteed to be stable across all versions of Spark. + */ + + private static StructField convert(Field field) { + DataType dataType = getDataType(field); + boolean nullable = true; + + if (field.getMode() == Field.Mode.REQUIRED) { + nullable = false; + } else if (field.getMode() == Field.Mode.REPEATED) { + dataType = new ArrayType(dataType, true); + } + + MetadataBuilder metadata = new MetadataBuilder(); + if (field.getDescription() != null) { + metadata.putString("description", field.getDescription()); + } + + return new StructField(field.getName(), dataType, nullable, metadata.build()); + } + + private static DataType getDataType(Field field) { + + if (LegacySQLTypeName.INTEGER.equals(field.getType())) { + return DataTypes.LongType; + } else if (LegacySQLTypeName.FLOAT.equals(field.getType())) { + return DataTypes.DoubleType; + } else if (LegacySQLTypeName.NUMERIC.equals(field.getType())) { + return NUMERIC_SPARK_TYPE; + } else if (LegacySQLTypeName.STRING.equals(field.getType())) { + return DataTypes.StringType; + } else if (LegacySQLTypeName.BOOLEAN.equals(field.getType())) { + return DataTypes.BooleanType; + } else if (LegacySQLTypeName.BYTES.equals(field.getType())) { + return DataTypes.BinaryType; + } else if (LegacySQLTypeName.DATE.equals(field.getType())) { + return DataTypes.DateType; + } else if (LegacySQLTypeName.TIMESTAMP.equals(field.getType())) { + return DataTypes.TimestampType; + } else if (LegacySQLTypeName.TIME.equals(field.getType())) { + return DataTypes.LongType; + // TODO(#5): add a timezone to allow parsing to timestamp + // This can be safely cast to TimestampType, but doing so causes the date to be inferred + // as the current date. It's safer to leave as a stable string and give the user the + // option of casting themselves. + } else if (LegacySQLTypeName.DATETIME.equals(field.getType())) { + return DataTypes.StringType; + } else if (LegacySQLTypeName.RECORD.equals(field.getType())) { + List structFields = field.getSubFields().stream().map(SchemaConverters::convert).collect(Collectors.toList()); + return new StructType(structFields.toArray(new StructField[0])); + } else if (LegacySQLTypeName.GEOGRAPHY.equals(field.getType())) { + return DataTypes.StringType; + } else { + throw new IllegalStateException("Unexpected type: " + field.getType()); + } + } +} diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala index 99e0c6336..b49505223 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala @@ -22,6 +22,8 @@ import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.avro.{Schema => AvroSchema} import org.apache.spark.sql.catalyst.InternalRow +import scala.collection.JavaConverters + /** * An iterator for scanning over rows serialized in Avro format * @param bqSchema Schema of underlying BigQuery source @@ -34,11 +36,12 @@ class AvroBinaryIterator(bqSchema: Schema, schema: AvroSchema, rowsInBytes: ByteString) extends Iterator[InternalRow] { - private lazy val converter = SchemaConverters.createRowConverter(bqSchema, columnsInOrder) _ val reader = new GenericDatumReader[GenericRecord](schema) + val columnsInOrderList = JavaConverters.seqAsJavaListConverter(columnsInOrder).asJava val in: BinaryDecoder = new DecoderFactory().binaryDecoder(rowsInBytes.toByteArray, null) override def hasNext: Boolean = !in.isEnd - override def next(): InternalRow = converter(reader.read(null, in)) + override def next(): InternalRow = SchemaConverters.createRowConverter(bqSchema, + columnsInOrderList, reader.read(null, in)) } diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala deleted file mode 100644 index cf657246e..000000000 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/SchemaConverters.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright 2018 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.spark.bigquery - -import java.nio.ByteBuffer - -import com.google.cloud.bigquery.LegacySQLTypeName._ -import com.google.cloud.bigquery.{Field, LegacySQLTypeName, Schema} -import org.apache.avro.generic.GenericRecord -import org.apache.avro.util.Utf8 -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.GenericArrayData -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -import scala.collection.JavaConverters._ - -/** Stateless converters for Converting between Spark and BigQuery types. */ -object SchemaConverters extends Logging { - - // Numeric is a fixed precision Decimal Type with 38 digits of precision and 9 digits of scale. - // See https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type - private val BQ_NUMERIC_PRECISION = 38 - private val BQ_NUMERIC_SCALE = 9 - private lazy val NUMERIC_SPARK_TYPE = DataTypes.createDecimalType( - BQ_NUMERIC_PRECISION, BQ_NUMERIC_SCALE) - - /** Convert a BigQuery schema to a Spark schema */ - def toSpark(schema: Schema): StructType = { - def convert(field: Field): StructField = { - var dataType = field.getType match { - // TODO(#1): Support NUMERIC - case INTEGER => LongType - case FLOAT => DoubleType - case NUMERIC => NUMERIC_SPARK_TYPE - case STRING => StringType - case BOOLEAN => BooleanType - case BYTES => BinaryType - case DATE => DateType - case TIMESTAMP => TimestampType - case TIME => LongType - // TODO(#5): add a timezone to allow parsing to timestamp - // This can be safely cast to TimestampType, but doing so causes the date to be inferred - // as the current date. It's safer to leave as a stable string and give the user the - // option of casting themselves. - case DATETIME => StringType - case RECORD => StructType(field.getSubFields.asScala.map(convert)) - case GEOGRAPHY => StringType - case other => throw new IllegalArgumentException(s"Unsupported type '$other'") - } - var nullable = true - Option(field.getMode) match { - case Some(Field.Mode.REQUIRED) => nullable = false - case Some(Field.Mode.REPEATED) => dataType = ArrayType(dataType, containsNull = true) - case _ => () // nullable field - } - val metadata = new MetadataBuilder - if (field.getDescription != null) { - metadata.putString("description", field.getDescription) - } - StructField(field.getName, dataType, nullable, metadata.build) - } - - - val res = StructType(schema.getFields.asScala.map(convert)) - logDebug(s"BQ Schema:\n'${schema.toString}'\n\nSpark schema:\n${res.treeString}") - res - } - - - /** - * Create a function that converts an Avro row with the given BigQuery schema to a Spark SQL row - * - * The conversion is based on the BigQuery schema, not Avro Schema, because the Avro schema is - * very painful to use. - * - * Not guaranteed to be stable across all versions of Spark. - */ - def createRowConverter(schema: Schema, namesInOrder: Seq[String])(record: GenericRecord) - : InternalRow = { - def convert(field: Field, value: Any): Any = { - if (value == null) { - return null - } - if (field.getMode == Field.Mode.REPEATED) { - // rather than recurring down we strip off the repeated mode - // Due to serialization issues, reconstruct the type using reflection: - // See: https://github.com/googleapis/google-cloud-java/issues/3942 - val fType = LegacySQLTypeName.valueOfStrict(field.getType.name) - val nestedField = Field.newBuilder(field.getName, fType, field.getSubFields) - // As long as this is not repeated it works, but technically arrays cannot contain - // nulls, so select required instead of nullable. - .setMode(Field.Mode.REQUIRED) - .build - return new GenericArrayData( - value.asInstanceOf[java.lang.Iterable[AnyRef]].asScala - .map(v => convert(nestedField, v))) - } - field.getType match { - case INTEGER | FLOAT | BOOLEAN | DATE | TIME | TIMESTAMP => value - // TODO(pmkc): use String for safety? - case STRING | DATETIME | GEOGRAPHY => - UTF8String.fromBytes(value.asInstanceOf[Utf8].getBytes) - case BYTES => getBytes(value.asInstanceOf[ByteBuffer]) - case NUMERIC => - val bytes = getBytes(value.asInstanceOf[ByteBuffer]) - Decimal(BigDecimal(BigInt(bytes), BQ_NUMERIC_SCALE), BQ_NUMERIC_PRECISION, - BQ_NUMERIC_SCALE) - case RECORD => - val fields = field.getSubFields.asScala - convertAll(fields, value.asInstanceOf[GenericRecord], fields.map(_.getName)) - case other => throw new IllegalArgumentException(s"Unsupported type '$other'") - } - } - - def getBytes(buf: ByteBuffer) = { - val bytes = new Array[Byte](buf.remaining) - buf.get(bytes) - bytes - } - - // Schema is not recursive so add helper for sequence of fields - def convertAll(fields: Seq[Field], - record: GenericRecord, - namesInOrder: Seq[String]): GenericInternalRow = { - val getValue = fields.zip(Range(0, record.getSchema.getFields.size()).map(record.get)) - .map { case (field, value) => field.getName -> convert(field, value) } - .toMap - new GenericInternalRow(namesInOrder.map(getValue).toArray) - } - - // Output in the order Spark expects - convertAll(schema.getFields.asScala, record, namesInOrder) - } -} - diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala index c174c1951..312249a45 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala @@ -18,7 +18,7 @@ package com.google.cloud.spark.bigquery.direct import com.google.api.gax.rpc.ServerStreamingCallable import com.google.cloud.bigquery.storage.v1.{BigQueryReadClient, DataFormat, ReadRowsRequest, ReadRowsResponse, ReadSession, ReadStream} import com.google.cloud.bigquery.{BigQuery, Schema} -import com.google.cloud.spark.bigquery.{ArrowBinaryIterator, AvroBinaryIterator, BigQueryUtil, SchemaConverters, SparkBigQueryOptions} +import com.google.cloud.spark.bigquery.{ArrowBinaryIterator, AvroBinaryIterator, BigQueryUtil, SparkBigQueryOptions} import com.google.protobuf.ByteString import org.apache.avro.{Schema => AvroSchema} import org.apache.spark.internal.Logging From 234a2f08aa28061a762d4b182aa8f02bba0157ca Mon Sep 17 00:00:00 2001 From: Gaurangi Saxena Date: Thu, 21 May 2020 13:11:53 -0700 Subject: [PATCH 03/16] port avro and arrow binary iterators to java --- .../spark/bigquery/ArrowBinaryIterator.java | 121 ++++++++++++++++++ .../spark/bigquery/AvroBinaryIterator.java | 64 +++++++++ .../spark/bigquery/ArrowBinaryIterator.scala | 110 ---------------- .../spark/bigquery/AvroBinaryIterator.scala | 47 ------- .../spark/bigquery/direct/BigQueryRDD.scala | 11 +- .../spark/bigquery/SchemaIteratorSuite.scala | 11 +- 6 files changed, 196 insertions(+), 168 deletions(-) create mode 100644 connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java create mode 100644 connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java delete mode 100644 connector/src/main/scala/com/google/cloud/spark/bigquery/ArrowBinaryIterator.scala delete mode 100644 connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java new file mode 100644 index 000000000..168ec6e6b --- /dev/null +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java @@ -0,0 +1,121 @@ +package com.google.cloud.spark.bigquery; + +import com.google.protobuf.ByteString; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.SequenceInputStream; +import java.util.*; +import java.util.stream.Collectors; + +public class ArrowBinaryIterator implements Iterator { + + private static long maxAllocation = Long.MAX_VALUE; + ArrowReaderIterator arrowReaderIterator; + Iterator currentIterator; + List columnsInOrder; + + public ArrowBinaryIterator(List columnsInOrder, ByteString schema, ByteString rowsInBytes) { + BufferAllocator allocator = (new RootAllocator(maxAllocation)).newChildAllocator("ArrowBinaryIterator", + 0, maxAllocation); + + SequenceInputStream bytesWithSchemaStream = new SequenceInputStream( + new ByteArrayInputStream(schema.toByteArray()), + new ByteArrayInputStream(rowsInBytes.toByteArray())); + + ArrowStreamReader arrowStreamReader = new ArrowStreamReader(bytesWithSchemaStream, allocator); + arrowReaderIterator = new ArrowReaderIterator(arrowStreamReader); + this.columnsInOrder = columnsInOrder; + initializeCurrentIterator(); + } + + @Override + public boolean hasNext() { + initializeCurrentIterator(); + return currentIterator.hasNext(); + } + + @Override + public InternalRow next() { + return currentIterator.next(); + } + + private void initializeCurrentIterator() { + while (currentIterator == null || !currentIterator.hasNext()) { + if (arrowReaderIterator.hasNext()) { + currentIterator = toArrowRows(arrowReaderIterator.next(), columnsInOrder); + } else { + break; + } + arrowReaderIterator.next(); + } + } + + private Iterator toArrowRows(VectorSchemaRoot root, List namesInOrder) { + List vectors = namesInOrder.stream().map(name -> root.getVector(name)).collect(Collectors.toList()); + ColumnVector[] columns = vectors.stream().map(vector -> new ArrowSchemaConverter(vector)) + .collect(Collectors.toList()).toArray(new ColumnVector[0]); + + ColumnarBatch batch = new ColumnarBatch(columns); + batch.setNumRows(root.getRowCount()); + return batch.rowIterator(); + } +} + +class ArrowReaderIterator implements Iterator { + + boolean closed = false; + VectorSchemaRoot current = null; + ArrowReader reader = null; + private static final Logger log = LoggerFactory.getLogger(AvroBinaryIterator.class); + + public ArrowReaderIterator(ArrowReader reader) { + this.reader = reader; + } + + @Override + public boolean hasNext() { + if (current != null) { + return true; + } + + try { + boolean res = reader.loadNextBatch(); + if (res) { + current = reader.getVectorSchemaRoot(); + } else { + ensureClosed(); + } + return res; + } catch (Exception e) { + log.error("Exception occurred in next() of ArrowBinaryIterator", e); + } + + return false; + } + + @Override + public VectorSchemaRoot next() { + VectorSchemaRoot res = current; + current = null; + return res; + } + + private void ensureClosed() throws IOException { + if (!closed) { + reader.close(); + closed = true; + } + } +} diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java b/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java new file mode 100644 index 000000000..cf9f9cedc --- /dev/null +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java @@ -0,0 +1,64 @@ +package com.google.cloud.spark.bigquery; + +import com.google.cloud.bigquery.Schema; +import com.google.protobuf.ByteString; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.spark.sql.catalyst.InternalRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class AvroBinaryIterator implements Iterator { + + GenericDatumReader reader; + List columnsInOrder; + BinaryDecoder in; + Schema bqSchema; + private static final Logger log = LoggerFactory.getLogger(AvroBinaryIterator.class); + + /** + * An iterator for scanning over rows serialized in Avro format + * @param bqSchema Schema of underlying BigQuery source + * @param columnsInOrder Sequence of columns in the schema + * @param schema Schema in avro format + * @param rowsInBytes Rows serialized in binary format for Avro + */ + public AvroBinaryIterator(Schema bqSchema, + List columnsInOrder, + org.apache.avro.Schema schema, + ByteString rowsInBytes) { + reader = new GenericDatumReader(schema); + this.bqSchema = bqSchema; + this.columnsInOrder = columnsInOrder; + in = new DecoderFactory().binaryDecoder(rowsInBytes.toByteArray(), null); + } + + @Override + public boolean hasNext() { + try { + return !in.isEnd(); + } catch (IOException e) { + log.error("Exception occurred in hasNext() of AvroBinaryIterator", e); + } + + return false; + } + + @Override + public InternalRow next() { + try { + return SchemaConverters.createRowConverter(bqSchema, + columnsInOrder, (GenericRecord) reader.read(null, in)); + } catch (IOException e) { + log.error("Exception occurred in next() of AvroBinaryIterator", e); + } + + return null; + } +} diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/ArrowBinaryIterator.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/ArrowBinaryIterator.scala deleted file mode 100644 index 9b17edc78..000000000 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/ArrowBinaryIterator.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2018 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.spark.bigquery - -import java.io.{ByteArrayInputStream, SequenceInputStream} - -import com.google.protobuf.ByteString -import org.apache.arrow.memory.RootAllocator -import org.apache.arrow.vector.VectorSchemaRoot -import org.apache.arrow.vector.ipc.{ArrowReader, ArrowStreamReader} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} - -import collection.JavaConverters._ - -/** - * An iterator for scanning over rows serialized in Arrow format - * @param columnsInOrder Sequence of columns in the schema - * @param schema Schema in arrow format - * @param rowsInBytes Rows serialized in binary format for Arrow - */ -class ArrowBinaryIterator(columnsInOrder: Seq[String], - schema: ByteString, - rowsInBytes: ByteString) extends Iterator[InternalRow] { - - val allocator = ArrowBinaryIterator.rootAllocator.newChildAllocator("ArrowBinaryIterator", - 0, ArrowBinaryIterator.maxAllocation) - - val bytesWithSchemaStream = new SequenceInputStream( - new ByteArrayInputStream(schema.toByteArray), - new ByteArrayInputStream(rowsInBytes.toByteArray)) - - val arrowStreamReader = new ArrowStreamReader(bytesWithSchemaStream, allocator) - - val arrowReaderIterator = new ArrowReaderIterator(arrowStreamReader) - val iterator = arrowReaderIterator.flatMap(root => toArrowRows(root, columnsInOrder)) - - private def toArrowRows(root: VectorSchemaRoot, namesInOrder: Seq[String]): - Iterator[InternalRow] = { - val vectors = namesInOrder.map(root.getVector) - - val columns = vectors.map { vector => - try { - new ArrowSchemaConverter(vector).asInstanceOf[ColumnVector] - } catch { - case e : UnsupportedOperationException => - throw new Exception(vector.getClass.getName, e) - } - }.toArray - - val batch = new ColumnarBatch(columns) - batch.setNumRows(root.getRowCount) - batch.rowIterator().asScala - } - - class ArrowReaderIterator(reader: ArrowReader) extends Iterator[VectorSchemaRoot] { - var closed = false - var current: VectorSchemaRoot = null - - def ensureClosed(): Unit = { - if (!closed) { - reader.close() - closed = true - } - } - - override def hasNext: Boolean = { - if (current != null) { - return true - } - val res = reader.loadNextBatch() - if (res) { - current = reader.getVectorSchemaRoot - } else { - ensureClosed() - } - res - } - - override def next(): VectorSchemaRoot = { - assert(current != null) - val res = current - current = null - res - } - } - - override def hasNext: Boolean = iterator.hasNext - - override def next(): InternalRow = iterator.next() -} - -object ArrowBinaryIterator { - // max allocation value for the allocator - var maxAllocation = Long.MaxValue - val rootAllocator = new RootAllocator(maxAllocation) -} diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala deleted file mode 100644 index b49505223..000000000 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/AvroBinaryIterator.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2018 Google Inc. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.spark.bigquery - -import com.google.cloud.bigquery.Schema -import com.google.protobuf.ByteString -import org.apache.avro.generic.{GenericDatumReader, GenericRecord} -import org.apache.avro.io.{BinaryDecoder, DecoderFactory} -import org.apache.avro.{Schema => AvroSchema} -import org.apache.spark.sql.catalyst.InternalRow - -import scala.collection.JavaConverters - -/** - * An iterator for scanning over rows serialized in Avro format - * @param bqSchema Schema of underlying BigQuery source - * @param columnsInOrder Sequence of columns in the schema - * @param schema Schema in avro format - * @param rowsInBytes Rows serialized in binary format for Avro - */ -class AvroBinaryIterator(bqSchema: Schema, - columnsInOrder: Seq[String], - schema: AvroSchema, - rowsInBytes: ByteString) extends Iterator[InternalRow] { - - val reader = new GenericDatumReader[GenericRecord](schema) - val columnsInOrderList = JavaConverters.seqAsJavaListConverter(columnsInOrder).asJava - val in: BinaryDecoder = new DecoderFactory().binaryDecoder(rowsInBytes.toByteArray, null) - - override def hasNext: Boolean = !in.isEnd - - override def next(): InternalRow = SchemaConverters.createRowConverter(bqSchema, - columnsInOrderList, reader.read(null, in)) -} diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala index 312249a45..133e44809 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/BigQueryRDD.scala @@ -25,12 +25,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.arrow.ArrowUtils import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.MutableList._ class BigQueryRDD(sc: SparkContext, parts: Array[Partition], @@ -88,9 +85,9 @@ case class ArrowConverter(columnsInOrder: Seq[String], { def getIterator(): Iterator[InternalRow] = { rowResponseIterator.flatMap(readRowResponse => - new ArrowBinaryIterator(columnsInOrder, + new ArrowBinaryIterator(columnsInOrder.asJava, rawArrowSchema, - readRowResponse.getArrowRecordBatch.getSerializedRecordBatch)); + readRowResponse.getArrowRecordBatch.getSerializedRecordBatch).asScala); } } @@ -116,9 +113,9 @@ case class AvroConverter (bqSchema: Schema, def toRows(response: ReadRowsResponse): Iterator[InternalRow] = new AvroBinaryIterator( bqSchema, - columnsInOrder, + columnsInOrder.asJava, avroSchema, - response.getAvroRows.getSerializedBinaryRows) + response.getAvroRows.getSerializedBinaryRows).asScala } case class BigQueryPartition(stream: String, index: Int) extends Partition diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala index a66223ef4..d1a050a84 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala @@ -23,7 +23,9 @@ import com.google.protobuf.ByteString import org.apache.avro.{Schema => AvroSchema} import org.apache.spark.sql.types.{ArrayType, BinaryType} import org.apache.spark.sql.types._ -import org.scalatest.{FunSuite} +import org.scalatest.FunSuite + +import scala.collection.JavaConverters._ /** * A test for ensuring that Arrow and Avros Schema generate same results for @@ -68,11 +70,12 @@ class SchemaIteratorSuite extends FunSuite { val schemaFields = SchemaConverters.toSpark(bqSchema).fields - val arrowSparkRow = new ArrowBinaryIterator(columnsInOrder, arrowSchema, arrowByteString) - .next() + val arrowSparkRow = new ArrowBinaryIterator(columnsInOrder.asJava, + arrowSchema, + arrowByteString).asScala.next() val avroSparkRow = new AvroBinaryIterator(bqSchema, - columnsInOrder, avroSchema, avroByteString).next() + columnsInOrder.asJava, avroSchema, avroByteString).next() for (col <- 0 to 11) { From 5a3ee6528e2f2128696c2974c936b650a380fd8d Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Thu, 21 May 2020 14:30:28 -0700 Subject: [PATCH 04/16] add unchecked IO exception --- .../spark/bigquery/ArrowBinaryIterator.java | 20 +++++++++---------- .../spark/bigquery/AvroBinaryIterator.java | 9 +++------ 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java index 168ec6e6b..c3ee9fcff 100644 --- a/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java @@ -1,5 +1,6 @@ package com.google.cloud.spark.bigquery; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -16,6 +17,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.SequenceInputStream; +import java.io.UncheckedIOException; import java.util.*; import java.util.stream.Collectors; @@ -36,6 +38,7 @@ public ArrowBinaryIterator(List columnsInOrder, ByteString schema, ByteS ArrowStreamReader arrowStreamReader = new ArrowStreamReader(bytesWithSchemaStream, allocator); arrowReaderIterator = new ArrowReaderIterator(arrowStreamReader); + currentIterator = ImmutableList.of().iterator(); this.columnsInOrder = columnsInOrder; initializeCurrentIterator(); } @@ -52,12 +55,11 @@ public InternalRow next() { } private void initializeCurrentIterator() { - while (currentIterator == null || !currentIterator.hasNext()) { - if (arrowReaderIterator.hasNext()) { - currentIterator = toArrowRows(arrowReaderIterator.next(), columnsInOrder); - } else { - break; + while (!currentIterator.hasNext()) { + if (!arrowReaderIterator.hasNext()) { + return; } + currentIterator = toArrowRows(arrowReaderIterator.next(), columnsInOrder); arrowReaderIterator.next(); } } @@ -77,7 +79,7 @@ class ArrowReaderIterator implements Iterator { boolean closed = false; VectorSchemaRoot current = null; - ArrowReader reader = null; + ArrowReader reader; private static final Logger log = LoggerFactory.getLogger(AvroBinaryIterator.class); public ArrowReaderIterator(ArrowReader reader) { @@ -98,11 +100,9 @@ public boolean hasNext() { ensureClosed(); } return res; - } catch (Exception e) { - log.error("Exception occurred in next() of ArrowBinaryIterator", e); + } catch (IOException e) { + throw new UncheckedIOException(e); } - - return false; } @Override diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java b/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java index cf9f9cedc..136bbc95a 100644 --- a/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.List; @@ -44,10 +45,8 @@ public boolean hasNext() { try { return !in.isEnd(); } catch (IOException e) { - log.error("Exception occurred in hasNext() of AvroBinaryIterator", e); + throw new UncheckedIOException(e); } - - return false; } @Override @@ -56,9 +55,7 @@ public InternalRow next() { return SchemaConverters.createRowConverter(bqSchema, columnsInOrder, (GenericRecord) reader.read(null, in)); } catch (IOException e) { - log.error("Exception occurred in next() of AvroBinaryIterator", e); + throw new UncheckedIOException(e); } - - return null; } } From 62abeb3639f90093e846d7f5c9738d92f829585d Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Thu, 21 May 2020 14:30:28 -0700 Subject: [PATCH 05/16] add unchecked IO exception --- .../spark/bigquery/ArrowBinaryIterator.java | 32 ++++++++----------- .../spark/bigquery/AvroBinaryIterator.java | 9 ++---- .../spark/bigquery/SchemaIteratorSuite.scala | 20 +++++++++--- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java index 168ec6e6b..55fe1c3bc 100644 --- a/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java @@ -1,5 +1,6 @@ package com.google.cloud.spark.bigquery; +import com.google.common.collect.ImmutableList; import com.google.protobuf.ByteString; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -16,6 +17,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.SequenceInputStream; +import java.io.UncheckedIOException; import java.util.*; import java.util.stream.Collectors; @@ -36,13 +38,20 @@ public ArrowBinaryIterator(List columnsInOrder, ByteString schema, ByteS ArrowStreamReader arrowStreamReader = new ArrowStreamReader(bytesWithSchemaStream, allocator); arrowReaderIterator = new ArrowReaderIterator(arrowStreamReader); + currentIterator = ImmutableList.of().iterator(); this.columnsInOrder = columnsInOrder; - initializeCurrentIterator(); } @Override public boolean hasNext() { - initializeCurrentIterator(); + while (!currentIterator.hasNext()) { + if (!arrowReaderIterator.hasNext()) { + return false; + } + currentIterator = toArrowRows(arrowReaderIterator.next(), columnsInOrder); + arrowReaderIterator.next(); + } + return currentIterator.hasNext(); } @@ -51,17 +60,6 @@ public InternalRow next() { return currentIterator.next(); } - private void initializeCurrentIterator() { - while (currentIterator == null || !currentIterator.hasNext()) { - if (arrowReaderIterator.hasNext()) { - currentIterator = toArrowRows(arrowReaderIterator.next(), columnsInOrder); - } else { - break; - } - arrowReaderIterator.next(); - } - } - private Iterator toArrowRows(VectorSchemaRoot root, List namesInOrder) { List vectors = namesInOrder.stream().map(name -> root.getVector(name)).collect(Collectors.toList()); ColumnVector[] columns = vectors.stream().map(vector -> new ArrowSchemaConverter(vector)) @@ -77,7 +75,7 @@ class ArrowReaderIterator implements Iterator { boolean closed = false; VectorSchemaRoot current = null; - ArrowReader reader = null; + ArrowReader reader; private static final Logger log = LoggerFactory.getLogger(AvroBinaryIterator.class); public ArrowReaderIterator(ArrowReader reader) { @@ -98,11 +96,9 @@ public boolean hasNext() { ensureClosed(); } return res; - } catch (Exception e) { - log.error("Exception occurred in next() of ArrowBinaryIterator", e); + } catch (IOException e) { + throw new UncheckedIOException(e); } - - return false; } @Override diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java b/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java index cf9f9cedc..136bbc95a 100644 --- a/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/AvroBinaryIterator.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Iterator; import java.util.List; @@ -44,10 +45,8 @@ public boolean hasNext() { try { return !in.isEnd(); } catch (IOException e) { - log.error("Exception occurred in hasNext() of AvroBinaryIterator", e); + throw new UncheckedIOException(e); } - - return false; } @Override @@ -56,9 +55,7 @@ public InternalRow next() { return SchemaConverters.createRowConverter(bqSchema, columnsInOrder, (GenericRecord) reader.read(null, in)); } catch (IOException e) { - log.error("Exception occurred in next() of AvroBinaryIterator", e); + throw new UncheckedIOException(e); } - - return null; } } diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala index d1a050a84..16d8226ee 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/SchemaIteratorSuite.scala @@ -21,6 +21,7 @@ import com.google.cloud.bigquery.{Field, Schema} import com.google.common.io.ByteStreams.toByteArray import com.google.protobuf.ByteString import org.apache.avro.{Schema => AvroSchema} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{ArrayType, BinaryType} import org.apache.spark.sql.types._ import org.scalatest.FunSuite @@ -70,12 +71,23 @@ class SchemaIteratorSuite extends FunSuite { val schemaFields = SchemaConverters.toSpark(bqSchema).fields - val arrowSparkRow = new ArrowBinaryIterator(columnsInOrder.asJava, + var avroSparkRow: InternalRow = null + var arrowSparkRow : InternalRow = null + + val arrowBinaryIterator = new ArrowBinaryIterator(columnsInOrder.asJava, arrowSchema, - arrowByteString).asScala.next() + arrowByteString).asScala + + if (arrowBinaryIterator.hasNext) { + arrowSparkRow = arrowBinaryIterator.next(); + } - val avroSparkRow = new AvroBinaryIterator(bqSchema, - columnsInOrder.asJava, avroSchema, avroByteString).next() + val avroBinaryIterator = new AvroBinaryIterator(bqSchema, + columnsInOrder.asJava, avroSchema, avroByteString) + + if (avroBinaryIterator.hasNext) { + avroSparkRow = avroBinaryIterator.next() + } for (col <- 0 to 11) { From 59bd9eabe63daae7c3020337f308d04312343315 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Wed, 27 May 2020 17:50:20 -0700 Subject: [PATCH 06/16] fix readme file for partition type --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 1563a379f..4ecd5919d 100644 --- a/README.md +++ b/README.md @@ -285,9 +285,7 @@ The API Supports a number of options to configure the read partitionField - If not set, the table is partitioned by pseudo column, referenced via either - '_PARTITIONTIME' as TIMESTAMP type, or '_PARTITIONDATE' as DATE type. If field is specified, - the table is instead partitioned by this field. The field must be a top-level TIMESTAMP or DATE + If set, the table is partitioned by this field. The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED.
(Optional). @@ -306,7 +304,9 @@ The API Supports a number of options to configure the read partitionType The only type supported is DAY, which will generate one partition per day. -
(Optional. Default to DAY). + If partitionField is not set, the table is partitioned by pseudo column, referenced via either + '_PARTITIONTIME' as TIMESTAMP type, or '_PARTITIONDATE' as DATE type. +
(Optional. Defaults to DAY if PartitionField is specified). Write From e136422b3ca75cb5cec6d40da6e19eeb9eb654a3 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Wed, 27 May 2020 17:50:20 -0700 Subject: [PATCH 07/16] fix readme file for partition type --- README.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 1563a379f..3cf963d12 100644 --- a/README.md +++ b/README.md @@ -285,11 +285,13 @@ The API Supports a number of options to configure the read partitionField - If not set, the table is partitioned by pseudo column, referenced via either - '_PARTITIONTIME' as TIMESTAMP type, or '_PARTITIONDATE' as DATE type. If field is specified, - the table is instead partitioned by this field. The field must be a top-level TIMESTAMP or DATE - field. Its mode must be NULLABLE or REQUIRED. -
(Optional). + If field is specified together with `partitionType`, the table is partitioned by this field. + The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE + or REQUIRED. + If the option is not set for a partitioned table, then the table will be partitioned by pseudo + column, referenced via either'_PARTITIONTIME' as TIMESTAMP type, or + '_PARTITIONDATE' as DATE type. +
(Optional). Write @@ -305,8 +307,9 @@ The API Supports a number of options to configure the read partitionType - The only type supported is DAY, which will generate one partition per day. -
(Optional. Default to DAY). + The only type supported is DAY, which will generate one partition per day. This option is mandatory + for a target table to be partitioned. +
(Optional. Defaults to DAY if PartitionField is specified). Write From 2cf35154d3dfd3c5dec7d898c426bfc0f7bf2b16 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Tue, 2 Jun 2020 17:47:40 -0700 Subject: [PATCH 08/16] change default read format to avro --- .../cloud/spark/bigquery/SparkBigQueryOptions.scala | 2 +- .../cloud/spark/bigquery/SparkBigQueryOptionsSuite.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptions.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptions.scala index 2c7f2f278..b64f0416b 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptions.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptions.scala @@ -89,7 +89,7 @@ object SparkBigQueryOptions { val ReadDataFormatOption = "readDataFormat" val ViewsEnabledOption = "viewsEnabled" - val DefaultReadDataFormat: DataFormat = DataFormat.AVRO + val DefaultReadDataFormat: DataFormat = DataFormat.ARROW val DefaultFormat: String = "parquet" val DefaultIntermediateFormat: IntermediateFormat = IntermediateFormat(DefaultFormat, FormatOptions.parquet()) diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptionsSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptionsSuite.scala index 96b074b32..419a3ff9a 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptionsSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/SparkBigQueryOptionsSuite.scala @@ -122,7 +122,7 @@ class SparkBigQueryOptionsSuite extends FunSuite { } test("data format - no value set") { - assertResult("AVRO") { + assertResult("ARROW") { val options = SparkBigQueryOptions( parameters, Map.empty[String, String], // allConf @@ -134,10 +134,10 @@ class SparkBigQueryOptionsSuite extends FunSuite { } } - test("Set Read Data Format as Arrow") { - assertResult("ARROW") { + test("Set Read Data Format as Avro") { + assertResult("AVRO") { val options = SparkBigQueryOptions( - parameters + ("readDataFormat" -> "Arrow"), + parameters + ("readDataFormat" -> "Avro"), Map.empty[String, String], // allConf new Configuration, new SQLConf, From 3e836d3ce1f2ce0ce9e465a4f242e3a78a4a255d Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Thu, 4 Jun 2020 15:30:43 -0700 Subject: [PATCH 09/16] fix hasNext method of arrowBinaryIterator --- .../com/google/cloud/spark/bigquery/ArrowBinaryIterator.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java index 69b0e4697..0d5f4f17a 100644 --- a/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java +++ b/connector/src/main/java/com/google/cloud/spark/bigquery/ArrowBinaryIterator.java @@ -103,6 +103,10 @@ public boolean hasNext() { return true; } + if (closed) { + return false; + } + try { boolean res = reader.loadNextBatch(); if (res) { From c34f1f8372329f009eda54e9a7d476a22ea2ebf8 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Thu, 4 Jun 2020 16:11:56 -0700 Subject: [PATCH 10/16] add integration test for caching --- .../bigquery/it/SparkBigQueryEndToEndITSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala index 382853b25..05413f82c 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala @@ -157,6 +157,20 @@ class SparkBigQueryEndToEndITSuite extends FunSuite assert(row(1).isInstanceOf[String]) } + test("cache data frame in DataSource %s. Data Format %s".format(dataSourceFormat, dataFormat)) { + val allTypesTable = readAllTypesTable("bigquery") + writeToBigQuery(allTypesTable, SaveMode.Overwrite, "avro") + + val df = spark.read.format("bigquery") + .option("dataset", testDataset) + .option("table", testTable) + .option("readDataFormat", "arrow") + .load().cache() + + assert(df.head() == allTypesTable.head()) + assert(df.schema == allTypesTable.schema) + } + test("number of partitions. DataSource %s. Data Format %s" .format(dataSourceFormat, dataFormat)) { val df = spark.read.format("com.google.cloud.spark.bigquery") From 8c23e4f6edec2aa62dd2a74bf02b3c14c4d350ac Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Thu, 4 Jun 2020 16:20:25 -0700 Subject: [PATCH 11/16] modify test to read from cache --- .../cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala index 05413f82c..28fced056 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala @@ -168,6 +168,9 @@ class SparkBigQueryEndToEndITSuite extends FunSuite .load().cache() assert(df.head() == allTypesTable.head()) + + // read from cache + assert(df.head() == allTypesTable.head()) assert(df.schema == allTypesTable.schema) } From beefd81213db77ab09cfe57a4580b58da5c4c546 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Tue, 16 Jun 2020 12:52:44 -0700 Subject: [PATCH 12/16] add benchmarking script --- .DS_Store | Bin 0 -> 6148 bytes examples/.DS_Store | Bin 0 -> 6148 bytes examples/python/benchmarking.py | 58 ++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+) create mode 100644 .DS_Store create mode 100644 examples/.DS_Store create mode 100644 examples/python/benchmarking.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..bbd62b6d088502f63a429d582ff483a8e84c6ed5 GIT binary patch literal 6148 zcmeHK%Wl&^6ur}i)(%3=qNrVZgT$hsR1~TRA*Fc;f}o(fzyeSk$8KZf+EMI~mJpN; z{{Z{}pTZaLA$$Owc?kRny9xoC8_nG7nS17O=FUuph*)FfZW2|9$Ux${R7N&Ocs)-A zSx_Y_&`h-9(shc-p`;TOymf(QKr`^SF~Ik(i5)v4p8`tl->ByVk%L-(G~7nI`dtVt zqB1q<9_`RB-K93!*!3smc7hD%a{<#s4-tBDfAyEeu(cep&N}G(I|9d(T*)E48t_cgUb43QmbukSX(!1(+z8~yFb4-0;a$zZ!Jd4=F zNOq`2525D-314((q!#@$TJB+<0aA=PpNUYprb6`yJs+c;Qv^|v9W7@f)6x>q;0yj9 zzcSO?hx3r}#YE3r)Pfrq&%0sWo@PKZ@Xs^A&j$;M>$=i2sm0NOOgsUgODGlsoBv_p z7)$B8(le=117Q*hEul<3F_?s-+){pZrDsx0I5GA3V5(=PUMNiTj`l6#POL7q&NTy? zfh+@y^0UJ4|Mu_S|5=d!(hO(@{woGpsb#ksn3B3%3zOq_t%Y=m#D)9Kq!til>N*w% gzlzt9grLsi26SENnbatO=syC22AydJ{wM=K0B|G&djJ3c literal 0 HcmV?d00001 diff --git a/examples/.DS_Store b/examples/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..3d56cc0f3ab11fad24e0a8a279887236e3e0d7fa GIT binary patch literal 6148 zcmeHK&2G~`5S~p#S*JqefJBeJRN~M>DhgGEkgQM+xgoiN15~X|qQ=5_qu3!TA;_-* zZ@{DQ1UwFWv%9H?QaDyoHDk?wv$Hd6@3+?e5RvLl2M0uLBGORCMu6fP;da(FX?RZ) zsB9l889X1T?B{FI;@Ct6c96@S@6+6*Xysqy8T8R_E3k;RX-t>M7$L9d z=PR*4{ui6h%CYbFXFATavg&kxsm50G&h}lcTe@|xbsCS%EUuDjTK1C3TduuIs!{6g z6Jzo#J4s(>!`WH;!3$GVNmdLeHaQy%5%TU$Rt(InXQsuVwEe`cLF+&VXYJkjyxTq8 z4-cPo7yIG-=%@qx@zcd3&=2$7Kn1GK$oig6+^jn_(LBTd8{3}bW;BEq5PYbzo97mcJv?Ga8i*&DTM)H zV3mO_S8enD|MK7W|J5X^2?N5wtztkmkK^MWhUE9wwZZXT8$!>aEF4!myh?##w_?Qd dR=f+f0)NN`F!ESCga;xA0ZW4v!oZC(@CWVdYkL3y literal 0 HcmV?d00001 diff --git a/examples/python/benchmarking.py b/examples/python/benchmarking.py new file mode 100644 index 000000000..f8e1eb303 --- /dev/null +++ b/examples/python/benchmarking.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# An example that shows how to query BigQuery and read those results into Spark. +# +# It relies on https://pypi.org/project/google-cloud-bigquery. To see how to +# install python libraries into Google Cloud Dataproc see +# https://cloud.google.com/dataproc/docs/tutorials/python-configuration + +from __future__ import print_function +import time +import getpass +import sys +import csv +import imp +from pyspark.sql import SparkSession + +table = sys.argv[1] +optable = sys.argv[2] +spark = SparkSession.builder.appName('Benchmarking').config('spark.jars.packages','com.databricks:spark-avro_2.11:4.0.0').getOrCreate() + +bucket = "gaurangisaxena" +spark.conf.set("temporaryGcsBucket", bucket) + +df = spark.read.format('bigquery').option('table', table).option('readDataFormat', 'avro').load().cache() +df.write.csv("local2.csv") + +for x in range(0, 5): + outputTable = "benchmark.experiment" + str(x) + "output" + optable + print('output table',outputTable) + start = time.time() + df.write.format("bigquery").mode("overwrite").option("table", outputTable + "orc").option("intermediateFormat", "orc").save() + orcTime = time.time() - start + + start = time.time() + df.write.format("bigquery").mode("overwrite").option("table", outputTable + "parquet").option("intermediateFormat", "parquet").save() + parquetTime = time.time() - start + + start = time.time() + df.write.format("bigquery").mode("overwrite").option("table", outputTable + "avro").option("intermediateFormat", "avro").save() + avroTime = time.time() - start + + print('orc','parquet','avro') + print(orcTime,parquetTime,avroTime) + + From 11fe64d2efd829700751bf46f648bf9943d74580 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Fri, 19 Jun 2020 14:19:55 -0700 Subject: [PATCH 13/16] add materialized view as table type --- .../cloud/spark/bigquery/BigQueryRelationProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala index c0a10eaff..6d6b44456 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala @@ -16,7 +16,7 @@ package com.google.cloud.spark.bigquery import com.google.auth.Credentials -import com.google.cloud.bigquery.TableDefinition.Type.{TABLE, VIEW} +import com.google.cloud.bigquery.TableDefinition.Type.{MATERIALIZED_VIEW, TABLE, VIEW} import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, TableDefinition} import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation import org.apache.spark.sql.sources._ @@ -57,7 +57,7 @@ class BigQueryRelationProvider( .getOrElse(sys.error(s"Table $tableName not found")) table.getDefinition[TableDefinition].getType match { case TABLE => new DirectBigQueryRelation(opts, table)(sqlContext) - case VIEW => if (opts.viewsEnabled) { + case VIEW || MATERIALIZED_VIEW => if (opts.viewsEnabled) { new DirectBigQueryRelation(opts, table)(sqlContext) } else { sys.error( From 0a1d5e356df2d20958a4b1e2569d8314554d4b73 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Fri, 19 Jun 2020 14:19:55 -0700 Subject: [PATCH 14/16] add materialized view as table type --- .../bigquery/BigQueryRelationProvider.scala | 4 +- examples/.DS_Store | Bin 6148 -> 0 bytes examples/python/benchmarking.py | 58 ------------------ 3 files changed, 2 insertions(+), 60 deletions(-) delete mode 100644 examples/.DS_Store delete mode 100644 examples/python/benchmarking.py diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala index c0a10eaff..6d6b44456 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/BigQueryRelationProvider.scala @@ -16,7 +16,7 @@ package com.google.cloud.spark.bigquery import com.google.auth.Credentials -import com.google.cloud.bigquery.TableDefinition.Type.{TABLE, VIEW} +import com.google.cloud.bigquery.TableDefinition.Type.{MATERIALIZED_VIEW, TABLE, VIEW} import com.google.cloud.bigquery.{BigQuery, BigQueryOptions, TableDefinition} import com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation import org.apache.spark.sql.sources._ @@ -57,7 +57,7 @@ class BigQueryRelationProvider( .getOrElse(sys.error(s"Table $tableName not found")) table.getDefinition[TableDefinition].getType match { case TABLE => new DirectBigQueryRelation(opts, table)(sqlContext) - case VIEW => if (opts.viewsEnabled) { + case VIEW || MATERIALIZED_VIEW => if (opts.viewsEnabled) { new DirectBigQueryRelation(opts, table)(sqlContext) } else { sys.error( diff --git a/examples/.DS_Store b/examples/.DS_Store deleted file mode 100644 index 3d56cc0f3ab11fad24e0a8a279887236e3e0d7fa..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHK&2G~`5S~p#S*JqefJBeJRN~M>DhgGEkgQM+xgoiN15~X|qQ=5_qu3!TA;_-* zZ@{DQ1UwFWv%9H?QaDyoHDk?wv$Hd6@3+?e5RvLl2M0uLBGORCMu6fP;da(FX?RZ) zsB9l889X1T?B{FI;@Ct6c96@S@6+6*Xysqy8T8R_E3k;RX-t>M7$L9d z=PR*4{ui6h%CYbFXFATavg&kxsm50G&h}lcTe@|xbsCS%EUuDjTK1C3TduuIs!{6g z6Jzo#J4s(>!`WH;!3$GVNmdLeHaQy%5%TU$Rt(InXQsuVwEe`cLF+&VXYJkjyxTq8 z4-cPo7yIG-=%@qx@zcd3&=2$7Kn1GK$oig6+^jn_(LBTd8{3}bW;BEq5PYbzo97mcJv?Ga8i*&DTM)H zV3mO_S8enD|MK7W|J5X^2?N5wtztkmkK^MWhUE9wwZZXT8$!>aEF4!myh?##w_?Qd dR=f+f0)NN`F!ESCga;xA0ZW4v!oZC(@CWVdYkL3y diff --git a/examples/python/benchmarking.py b/examples/python/benchmarking.py deleted file mode 100644 index f8e1eb303..000000000 --- a/examples/python/benchmarking.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python -# Copyright 2018 Google Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# An example that shows how to query BigQuery and read those results into Spark. -# -# It relies on https://pypi.org/project/google-cloud-bigquery. To see how to -# install python libraries into Google Cloud Dataproc see -# https://cloud.google.com/dataproc/docs/tutorials/python-configuration - -from __future__ import print_function -import time -import getpass -import sys -import csv -import imp -from pyspark.sql import SparkSession - -table = sys.argv[1] -optable = sys.argv[2] -spark = SparkSession.builder.appName('Benchmarking').config('spark.jars.packages','com.databricks:spark-avro_2.11:4.0.0').getOrCreate() - -bucket = "gaurangisaxena" -spark.conf.set("temporaryGcsBucket", bucket) - -df = spark.read.format('bigquery').option('table', table).option('readDataFormat', 'avro').load().cache() -df.write.csv("local2.csv") - -for x in range(0, 5): - outputTable = "benchmark.experiment" + str(x) + "output" + optable - print('output table',outputTable) - start = time.time() - df.write.format("bigquery").mode("overwrite").option("table", outputTable + "orc").option("intermediateFormat", "orc").save() - orcTime = time.time() - start - - start = time.time() - df.write.format("bigquery").mode("overwrite").option("table", outputTable + "parquet").option("intermediateFormat", "parquet").save() - parquetTime = time.time() - start - - start = time.time() - df.write.format("bigquery").mode("overwrite").option("table", outputTable + "avro").option("intermediateFormat", "avro").save() - avroTime = time.time() - start - - print('orc','parquet','avro') - print(orcTime,parquetTime,avroTime) - - From c59ca1fa9076d4d5b5933f186f0eab9669931288 Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Mon, 22 Jun 2020 16:44:15 -0700 Subject: [PATCH 15/16] add support for materialized view --- .../bigquery/connector/common/BigQueryClient.java | 4 ++-- .../bigquery/connector/common/ReadSessionCreator.java | 2 +- .../bigquery/it/SparkBigQueryEndToEndITSuite.scala | 11 ++++++++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java index 8236f5c26..9de36bc4a 100644 --- a/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java +++ b/connector/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java @@ -73,7 +73,7 @@ public TableInfo getSupportedTable(TableId tableId, boolean viewsEnabled, String if (TableDefinition.Type.TABLE == tableType) { return table; } - if (TableDefinition.Type.VIEW == tableType) { + if (TableDefinition.Type.VIEW == tableType || TableDefinition.Type.MATERIALIZED_VIEW == tableType) { if (viewsEnabled) { return table; } else { @@ -167,7 +167,7 @@ public long calculateTableSize(TableInfo tableInfo, Optional filter) { TableDefinition.Type type = tableInfo.getDefinition().getType(); if (type == TableDefinition.Type.TABLE && !filter.isPresent()) { return tableInfo.getNumRows().longValue(); - } else if (type == TableDefinition.Type.VIEW || + } else if (type == TableDefinition.Type.VIEW || type == TableDefinition.Type.MATERIALIZED_VIEW || (type == TableDefinition.Type.TABLE && filter.isPresent())) { // run a query String table = fullTableName(tableInfo.getTableId()); diff --git a/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java b/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java index 9c03bc6d6..9b53516b5 100644 --- a/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java +++ b/connector/src/main/java/com/google/cloud/bigquery/connector/common/ReadSessionCreator.java @@ -126,7 +126,7 @@ TableInfo getActualTable( if (TableDefinition.Type.TABLE == tableType) { return table; } - if (TableDefinition.Type.VIEW == tableType) { + if (TableDefinition.Type.VIEW == tableType || TableDefinition.Type.MATERIALIZED_VIEW == tableType) { if (!config.viewsEnabled) { throw new BigQueryConnectorException(UNSUPPORTED, format( "Views are not enabled. You can enable views by setting '%s' to true. Notice additional cost may occur.", diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala index 28fced056..c56e12a92 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala @@ -346,7 +346,7 @@ class SparkBigQueryEndToEndITSuite extends FunSuite countResults should equal(countAfterCollect) } */ - + test("read data types. DataSource %s".format(dataSourceFormat)) { val allTypesTable = readAllTypesTable(dataSourceFormat) val expectedRow = spark.range(1).select(TestConstants.ALL_TYPES_TABLE_COLS: _*).head.toSeq @@ -542,6 +542,15 @@ class SparkBigQueryEndToEndITSuite extends FunSuite assert(df.schema == allTypesTable.schema) } + test("query materialized view") { + var df = spark.read.format("bigquery") + .option("table", "bigquery-public-data:ethereum_blockchain.live_logs") + .option("viewsEnabled", "true") + .option("viewMaterializationProject", "bigquery-public-data") + .option("viewMaterializationDataset", "ethereum_blockchain") + .load() + } + test("write to bq - adding the settings to spark.conf" ) { spark.conf.set("temporaryGcsBucket", temporaryGcsBucket) val df = initialData From b7505d670137635cfa09c85b6d50444ba756968a Mon Sep 17 00:00:00 2001 From: Gaurangi94 Date: Tue, 23 Jun 2020 12:24:27 -0700 Subject: [PATCH 16/16] add conditions for Materialized View --- cloudbuild/cloudbuild.yaml | 2 ++ .../spark/bigquery/direct/DirectBigQueryRelation.scala | 8 ++++++-- .../spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala | 4 ++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cloudbuild/cloudbuild.yaml b/cloudbuild/cloudbuild.yaml index c36af8d7f..bb15d2eaa 100644 --- a/cloudbuild/cloudbuild.yaml +++ b/cloudbuild/cloudbuild.yaml @@ -15,6 +15,8 @@ steps: id: 'integration-tests' entrypoint: 'sbt' args: ['it:test'] + env: + - 'GOOGLE_CLOUD_PROJECT=${_GOOGLE_CLOUD_PROJECT}' # Tests take around 13 mins in general. timeout: 1200s diff --git a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala index 55ad03dc0..a901afb04 100644 --- a/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala +++ b/connector/src/main/scala/com/google/cloud/spark/bigquery/direct/DirectBigQueryRelation.scala @@ -193,7 +193,9 @@ private[bigquery] class DirectBigQueryRelation( ): TableInfo = { val tableDefinition = table.getDefinition[TableDefinition] val tableType = tableDefinition.getType - if(options.viewsEnabled && TableDefinition.Type.VIEW == tableType) { + if(options.viewsEnabled && + (TableDefinition.Type.VIEW == tableType || + TableDefinition.Type.MATERIALIZED_VIEW == tableType)) { // get it from the view val querySql = createSql(tableDefinition.getSchema, requiredColumns, filtersString) logDebug(s"querySql is $querySql") @@ -274,7 +276,9 @@ private[bigquery] class DirectBigQueryRelation( def getNumBytes(tableDefinition: TableDefinition): Long = { val tableType = tableDefinition.getType - if (options.viewsEnabled && TableDefinition.Type.VIEW == tableType) { + if (options.viewsEnabled && + (TableDefinition.Type.VIEW == tableType || + TableDefinition.Type.MATERIALIZED_VIEW == tableType)) { sqlContext.sparkSession.sessionState.conf.defaultSizeInBytes } else { tableDefinition.asInstanceOf[StandardTableDefinition].getNumBytes diff --git a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala index c56e12a92..be99d16f0 100644 --- a/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala +++ b/connector/src/test/scala/com/google/cloud/spark/bigquery/it/SparkBigQueryEndToEndITSuite.scala @@ -546,8 +546,8 @@ class SparkBigQueryEndToEndITSuite extends FunSuite var df = spark.read.format("bigquery") .option("table", "bigquery-public-data:ethereum_blockchain.live_logs") .option("viewsEnabled", "true") - .option("viewMaterializationProject", "bigquery-public-data") - .option("viewMaterializationDataset", "ethereum_blockchain") + .option("viewMaterializationProject", System.getenv("GOOGLE_CLOUD_PROJECT")) + .option("viewMaterializationDataset", testDataset) .load() }