diff --git a/LICENSE b/LICENSE index 325ba653bf888..341988ee73550 100644 --- a/LICENSE +++ b/LICENSE @@ -241,27 +241,17 @@ This product includes code from https://github.com/twitter/commons/blob/master/s limitations under the License. ================================================================================================= -This product includes code from Databricks spark-avro with the below license +This product includes code from Apache Spark -* org.apache.hudi.AvroConversionHelper copied from classes in com/databricks/spark/avro package +* org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package - Copyright 2014 Databricks - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Copyright: 2014 and onwards The Apache Software Foundation +Home page: http://spark.apache.org/ +License: http://www.apache.org/licenses/LICENSE-2.0 -------------------------------------------------------------------------------- -This product includes code from https://github.com/big-data-europe/README +This product includes code from https://github.com/big-data-europe/README * docker/hoodie/hadoop/base/entrypoint.sh copied from https://github.com/big-data-europe/docker-hadoop/blob/master/base/entrypoint.sh diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java index 2e08158c5ab5e..16d9aae64f91f 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java @@ -174,6 +174,8 @@ private static String convertField(final Type parquetType) { final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata(); return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ") .append(decimalMetadata.getScale()).append(")").toString(); + } else if (originalType == OriginalType.DATE) { + return field.append("DATE").toString(); } // TODO - fix the method naming here return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter() { diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index 5e6f90e6a37de..39ac694842ddb 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -107,7 +107,7 @@ static String getSparkShellCommand(String commandFile) { .append(" --master local[2] --driver-class-path ").append(HADOOP_CONF_DIR) .append( " --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 1 ") - .append(" --packages com.databricks:spark-avro_2.11:4.0.0 ").append(" -i ").append(commandFile).toString(); + .append(" --packages org.apache.spark:spark-avro_2.11:2.4.4 ").append(" -i ").append(commandFile).toString(); } static String getPrestoConsoleCommand(String commandFile) { diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml index b490dd740dce1..d69954a1812c7 100644 --- a/hudi-spark/pom.xml +++ b/hudi-spark/pom.xml @@ -213,9 +213,9 @@ - com.databricks + org.apache.spark spark-avro_2.11 - 4.0.0 + provided @@ -239,8 +239,19 @@ ${hive.groupid} - hive-service + hive-exec ${hive.version} + ${hive.exec.classifier} + + + javax.mail + mail + + + org.eclipse.jetty.aggregate + * + + ${hive.groupid} diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 97f89f945106d..8244fc327ad45 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -1,13 +1,12 @@ /* - * This code is copied from com.databricks:spark-avro with following license + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * Copyright 2014 Databricks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,14 +21,15 @@ import java.nio.ByteBuffer import java.sql.{Date, Timestamp} import java.util -import com.databricks.spark.avro.SchemaConverters -import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException -import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.Conversions.DecimalConversion +import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} +import org.apache.avro.{LogicalTypes, Schema} import org.apache.avro.Schema.Type._ import org.apache.avro.generic.GenericData.{Fixed, Record} -import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord} import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace import org.apache.spark.sql.Row +import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ @@ -37,6 +37,16 @@ import scala.collection.JavaConverters._ object AvroConversionHelper { + private def createDecimal(decimal: java.math.BigDecimal, precision: Int, scale: Int): Decimal = { + if (precision <= Decimal.MAX_LONG_DIGITS) { + // Constructs a `Decimal` with an unscaled `Long` value if possible. + Decimal(decimal.unscaledValue().longValue(), precision, scale) + } else { + // Otherwise, resorts to an unscaled `BigInteger` instead. + Decimal(decimal, precision, scale) + } + } + /** * * Returns a converter function to convert row in avro format to GenericRow of catalyst. @@ -76,7 +86,50 @@ object AvroConversionHelper { byteBuffer.get(bytes) bytes } - + case (d: DecimalType, FIXED) => + (item: AnyRef) => + if (item == null) { + null + } else { + val decimalConversion = new DecimalConversion + val bigDecimal = decimalConversion.fromFixed(item.asInstanceOf[GenericFixed], avroSchema, + LogicalTypes.decimal(d.precision, d.scale)) + createDecimal(bigDecimal, d.precision, d.scale) + } + case (d: DecimalType, BYTES) => + (item: AnyRef) => + if (item == null) { + null + } else { + val decimalConversion = new DecimalConversion + val bigDecimal = decimalConversion.fromBytes(item.asInstanceOf[ByteBuffer], avroSchema, + LogicalTypes.decimal(d.precision, d.scale)) + createDecimal(bigDecimal, d.precision, d.scale) + } + case (DateType, INT) => + (item: AnyRef) => + if (item == null) { + null + } else { + new Date(item.asInstanceOf[Long]) + } + case (TimestampType, LONG) => + (item: AnyRef) => + if (item == null) { + null + } else { + avroSchema.getLogicalType match { + case _: TimestampMillis => + new Timestamp(item.asInstanceOf[Long]) + case _: TimestampMicros => + new Timestamp(item.asInstanceOf[Long] / 1000) + case null => + new Timestamp(item.asInstanceOf[Long]) + case other => + throw new IncompatibleSchemaException( + s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") + } + } case (struct: StructType, RECORD) => val length = struct.fields.length val converters = new Array[AnyRef => AnyRef](length) @@ -216,7 +269,8 @@ object AvroConversionHelper { createConverter(sourceAvroSchema, targetSqlType, List.empty[String]) } - def createConverterToAvro(dataType: DataType, + def createConverterToAvro(avroSchema: Schema, + dataType: DataType, structName: String, recordNamespace: String): Any => Any = { dataType match { @@ -231,13 +285,22 @@ object AvroConversionHelper { if (item == null) null else item.asInstanceOf[Byte].intValue case ShortType => (item: Any) => if (item == null) null else item.asInstanceOf[Short].intValue - case _: DecimalType => (item: Any) => if (item == null) null else item.toString + case dec: DecimalType => (item: Any) => + Option(item).map { i => + val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal] + val decimalConversions = new DecimalConversion() + decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0), + LogicalTypes.decimal(dec.precision, dec.scale)) + }.orNull case TimestampType => (item: Any) => - if (item == null) null else item.asInstanceOf[Timestamp].getTime + // Convert time to microseconds since spark-avro by default converts TimestampType to + // Avro Logical TimestampMicros + Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull case DateType => (item: Any) => - if (item == null) null else item.asInstanceOf[Date].getTime + Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull case ArrayType(elementType, _) => val elementConverter = createConverterToAvro( + avroSchema, elementType, structName, getNewRecordNamespace(elementType, recordNamespace, structName)) @@ -258,6 +321,7 @@ object AvroConversionHelper { } case MapType(StringType, valueType, _) => val valueConverter = createConverterToAvro( + avroSchema, valueType, structName, getNewRecordNamespace(valueType, recordNamespace, structName)) @@ -273,11 +337,10 @@ object AvroConversionHelper { } } case structType: StructType => - val builder = SchemaBuilder.record(structName).namespace(recordNamespace) - val schema: Schema = SchemaConverters.convertStructToAvro( - structType, builder, recordNamespace) + val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) val fieldConverters = structType.fields.map(field => createConverterToAvro( + avroSchema, field.dataType, field.name, getNewRecordNamespace(field.dataType, recordNamespace, field.name))) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 372b44afb02f5..a27d0ee5d5d91 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -17,11 +17,11 @@ package org.apache.hudi -import com.databricks.spark.avro.SchemaConverters import org.apache.avro.generic.GenericRecord -import org.apache.avro.{Schema, SchemaBuilder} import org.apache.hudi.common.model.HoodieKey +import org.apache.avro.Schema import org.apache.spark.rdd.RDD +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} @@ -30,13 +30,20 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object AvroConversionUtils { def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { + val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) + createRdd(df, avroSchema.toString, structName, recordNamespace) + } + + def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String) + : RDD[GenericRecord] = { val dataType = df.schema val encoder = RowEncoder.apply(dataType).resolveAndBind() df.queryExecution.toRdd.map(encoder.fromRow) .mapPartitions { records => if (records.isEmpty) Iterator.empty else { - val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace) + val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString) + val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace) records.map { x => convertor(x).asInstanceOf[GenericRecord] } } } @@ -75,11 +82,10 @@ object AvroConversionUtils { def convertStructTypeToAvroSchema(structType: StructType, structName: String, recordNamespace: String): Schema = { - val builder = SchemaBuilder.record(structName).namespace(recordNamespace) - SchemaConverters.convertStructToAvro(structType, builder, recordNamespace) + SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) } def convertAvroSchemaToStructType(avroSchema: Schema): StructType = { - SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]; + SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] } } diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index e1b286f9ea453..ab67c6f123c49 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -104,6 +104,7 @@ public void run() throws Exception { SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate(); JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext()); + spark.sparkContext().setLogLevel("WARN"); FileSystem fs = FileSystem.get(jssc.hadoopConfiguration()); // Generator of some records to be loaded in. diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 4c88eb05d88bf..53a3a1a3acec0 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -161,7 +161,7 @@ - com.databricks + org.apache.spark spark-avro_2.11 provided diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml index 5237d2281d191..3c40c69dc2b16 100644 --- a/packaging/hudi-hadoop-mr-bundle/pom.xml +++ b/packaging/hudi-hadoop-mr-bundle/pom.xml @@ -89,7 +89,7 @@ org.apache.avro. - ${mr.bundle.avro.shade.prefix}org.apache.avro. + org.apache.hudi.org.apache.avro. false @@ -143,17 +143,7 @@ org.apache.avro avro - ${mr.bundle.avro.scope} + compile - - - - mr-bundle-shade-avro - - compile - org.apache.hudi. - - - diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 55d07fab131b1..c56c789295b49 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -94,8 +94,6 @@ org.apache.hive:hive-service-rpc org.apache.hive:hive-metastore org.apache.hive:hive-jdbc - - com.databricks:spark-avro_2.11 @@ -139,10 +137,6 @@ org.apache.commons.codec. org.apache.hudi.org.apache.commons.codec. - - com.databricks. - org.apache.hudi.com.databricks. - diff --git a/pom.xml b/pom.xml index f3f51c2db9c31..d4d47c0bb6e9a 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ 1.8 2.6.7 2.17 - 1.8.1 + 1.10.1 4.11 4.10 1.10.19 @@ -88,8 +88,8 @@ 2.3.1 core 4.1.1 - 2.1.0 - 1.7.7 + 2.4.4 + 1.8.2 2.11.8 2.11 0.12 @@ -105,8 +105,6 @@ ${skipTests} UTF-8 ${project.basedir} - provided - provided provided @@ -485,9 +483,10 @@ - com.databricks + org.apache.spark spark-avro_2.11 - 4.0.0 + ${spark.version} + provided @@ -934,13 +933,6 @@ file://${project.basedir}/src/test/resources/log4j-surefire-quiet.properties - - aws-emr-profile - - compile - org.apache.hudi. - - javadocs