Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -241,27 +241,17 @@ This product includes code from https://github.com/twitter/commons/blob/master/s
limitations under the License.
=================================================================================================

This product includes code from Databricks spark-avro with the below license
Comment thread
umehrot2 marked this conversation as resolved.
This product includes code from Apache Spark

* org.apache.hudi.AvroConversionHelper copied from classes in com/databricks/spark/avro package
* org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package

Copyright 2014 Databricks

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright: 2014 and onwards The Apache Software Foundation
Home page: http://spark.apache.org/
License: http://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product includes code from https://github.com/big-data-europe/README
This product includes code from https://github.com/big-data-europe/README

* docker/hoodie/hadoop/base/entrypoint.sh copied from https://github.com/big-data-europe/docker-hadoop/blob/master/base/entrypoint.sh

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ private static String convertField(final Type parquetType) {
final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata();
return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ")
.append(decimalMetadata.getScale()).append(")").toString();
} else if (originalType == OriginalType.DATE) {
return field.append("DATE").toString();
}
// TODO - fix the method naming here
return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ static String getSparkShellCommand(String commandFile) {
.append(" --master local[2] --driver-class-path ").append(HADOOP_CONF_DIR)
.append(
" --conf spark.sql.hive.convertMetastoreParquet=false --deploy-mode client --driver-memory 1G --executor-memory 1G --num-executors 1 ")
.append(" --packages com.databricks:spark-avro_2.11:4.0.0 ").append(" -i ").append(commandFile).toString();
.append(" --packages org.apache.spark:spark-avro_2.11:2.4.4 ").append(" -i ").append(commandFile).toString();
}

static String getPrestoConsoleCommand(String commandFile) {
Expand Down
17 changes: 14 additions & 3 deletions hudi-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@

<!-- Spark (Packages) -->
<dependency>
<groupId>com.databricks</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
<scope>provided</scope>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we can't bundle this since its tied to a spark version now. spark-avro is still a package and the user must explicitly include using --packages ? So, if I upgrade hudi in the next release, then as a user I need to change something? Should/how do we document this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this will be an additional Jar, the user would have to pass while starting the spark-shell. We would have to document it. I don't see any documentation for spark.serializer=org.apache.spark.serializer.KryoSerializer either which is also a pre-requisite right. Shall we update it in the README ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KryoSerializer command line is provided on the quickstart page. Adding it there could be a good thing. Do a follow on update?

</dependency>

<!-- Hadoop -->
Expand All @@ -239,8 +239,19 @@
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>${hive.exec.classifier}</classifier>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
Expand Down
103 changes: 83 additions & 20 deletions hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/*
* This code is copied from com.databricks:spark-avro with following license
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* Copyright 2014 Databricks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -22,21 +21,32 @@ import java.nio.ByteBuffer
import java.sql.{Date, Timestamp}
import java.util

import com.databricks.spark.avro.SchemaConverters
import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._

object AvroConversionHelper {

private def createDecimal(decimal: java.math.BigDecimal, precision: Int, scale: Int): Decimal = {
if (precision <= Decimal.MAX_LONG_DIGITS) {
// Constructs a `Decimal` with an unscaled `Long` value if possible.
Decimal(decimal.unscaledValue().longValue(), precision, scale)
} else {
// Otherwise, resorts to an unscaled `BigInteger` instead.
Decimal(decimal, precision, scale)
}
}

/**
*
* Returns a converter function to convert row in avro format to GenericRow of catalyst.
Expand Down Expand Up @@ -76,7 +86,50 @@ object AvroConversionHelper {
byteBuffer.get(bytes)
bytes
}

case (d: DecimalType, FIXED) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
val decimalConversion = new DecimalConversion
val bigDecimal = decimalConversion.fromFixed(item.asInstanceOf[GenericFixed], avroSchema,
LogicalTypes.decimal(d.precision, d.scale))
createDecimal(bigDecimal, d.precision, d.scale)
}
case (d: DecimalType, BYTES) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
val decimalConversion = new DecimalConversion
val bigDecimal = decimalConversion.fromBytes(item.asInstanceOf[ByteBuffer], avroSchema,
LogicalTypes.decimal(d.precision, d.scale))
createDecimal(bigDecimal, d.precision, d.scale)
}
case (DateType, INT) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
new Date(item.asInstanceOf[Long])
}
case (TimestampType, LONG) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
avroSchema.getLogicalType match {
case _: TimestampMillis =>
new Timestamp(item.asInstanceOf[Long])
case _: TimestampMicros =>
new Timestamp(item.asInstanceOf[Long] / 1000)
case null =>
new Timestamp(item.asInstanceOf[Long])
case other =>
throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
}
case (struct: StructType, RECORD) =>
val length = struct.fields.length
val converters = new Array[AnyRef => AnyRef](length)
Expand Down Expand Up @@ -216,7 +269,8 @@ object AvroConversionHelper {
createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
}

def createConverterToAvro(dataType: DataType,
def createConverterToAvro(avroSchema: Schema,
dataType: DataType,
structName: String,
recordNamespace: String): Any => Any = {
dataType match {
Expand All @@ -231,13 +285,22 @@ object AvroConversionHelper {
if (item == null) null else item.asInstanceOf[Byte].intValue
case ShortType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Short].intValue
case _: DecimalType => (item: Any) => if (item == null) null else item.toString
case dec: DecimalType => (item: Any) =>
Option(item).map { i =>
val bigDecimalValue = item.asInstanceOf[java.math.BigDecimal]
val decimalConversions = new DecimalConversion()
decimalConversions.toFixed(bigDecimalValue, avroSchema.getField(structName).schema().getTypes.get(0),
LogicalTypes.decimal(dec.precision, dec.scale))
}.orNull
case TimestampType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Timestamp].getTime
// Convert time to microseconds since spark-avro by default converts TimestampType to
// Avro Logical TimestampMicros
Option(item).map(_.asInstanceOf[Timestamp].getTime * 1000).orNull
case DateType => (item: Any) =>
if (item == null) null else item.asInstanceOf[Date].getTime
Option(item).map(_.asInstanceOf[Date].toLocalDate.toEpochDay.toInt).orNull
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(
avroSchema,
elementType,
structName,
getNewRecordNamespace(elementType, recordNamespace, structName))
Expand All @@ -258,6 +321,7 @@ object AvroConversionHelper {
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(
avroSchema,
valueType,
structName,
getNewRecordNamespace(valueType, recordNamespace, structName))
Expand All @@ -273,11 +337,10 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
val schema: Schema = SchemaConverters.convertStructToAvro(
structType, builder, recordNamespace)
val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
avroSchema,
field.dataType,
field.name,
getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.hudi

import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.generic.GenericRecord
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.hudi.common.model.HoodieKey
import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
Expand All @@ -30,13 +30,20 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object AvroConversionUtils {

def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema.toString, structName, recordNamespace)
}

def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
val dataType = df.schema
val encoder = RowEncoder.apply(dataType).resolveAndBind()
df.queryExecution.toRdd.map(encoder.fromRow)
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
val convertor = AvroConversionHelper.createConverterToAvro(avroSchema, dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
Expand Down Expand Up @@ -75,11 +82,10 @@ object AvroConversionUtils {
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
SchemaConverters.convertStructToAvro(structType, builder, recordNamespace)
SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
}

def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType];
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}
}
1 change: 1 addition & 0 deletions hudi-spark/src/test/java/HoodieJavaApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void run() throws Exception {
SparkSession spark = SparkSession.builder().appName("Hoodie Spark APP")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
JavaSparkContext jssc = new JavaSparkContext(spark.sparkContext());
spark.sparkContext().setLogLevel("WARN");
FileSystem fs = FileSystem.get(jssc.hadoopConfiguration());

// Generator of some records to be loaded in.
Expand Down
2 changes: 1 addition & 1 deletion hudi-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@
</dependency>

<dependency>
<groupId>com.databricks</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<scope>provided</scope>
</dependency>
Expand Down
14 changes: 2 additions & 12 deletions packaging/hudi-hadoop-mr-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
</relocation>
<relocation>
<pattern>org.apache.avro.</pattern>
<shadedPattern>${mr.bundle.avro.shade.prefix}org.apache.avro.</shadedPattern>
<shadedPattern>org.apache.hudi.org.apache.avro.</shadedPattern>
</relocation>
</relocations>
<createDependencyReducedPom>false</createDependencyReducedPom>
Expand Down Expand Up @@ -143,17 +143,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<scope>${mr.bundle.avro.scope}</scope>
<scope>compile</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>mr-bundle-shade-avro</id>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar @n3nash : call this to your attention. is removing the profile going to affect the custom payload implementations at Uber?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umehrot2 (cc @vinothchandar ) I will get back on this by today EOD.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(cc @n3nash ) Yeah, this would mean that we need to employ the same package relocation in the jar carrying custom record payloads. As discussed in the earlier threads, there is no way around it. @umehrot2 : We would need to document this caveat in Release Notes and add documentation on how to shade it. Can you create a ticket to track this ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar @bvaradar Yes, this will affect the custom payload implementation on the reader side. But we are anyways going to make some changes in how the payload packages are loaded so we should be able to absorb this change as part of those considerations.

<properties>
<mr.bundle.avro.scope>compile</mr.bundle.avro.scope>
<mr.bundle.avro.shade.prefix>org.apache.hudi.</mr.bundle.avro.shade.prefix>
</properties>
</profile>
</profiles>
</project>
6 changes: 0 additions & 6 deletions packaging/hudi-spark-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@
<include>org.apache.hive:hive-service-rpc</include>
<include>org.apache.hive:hive-metastore</include>
<include>org.apache.hive:hive-jdbc</include>

<include>com.databricks:spark-avro_2.11</include>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we bundled org.apache.spark:spark-avro would n't that make life simpler for everyone?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give it a shot, but we need to carefully understand the consequences of shading a spark library, inside a Jar which is being run on Spark. I remember earlier we had some issue on EMR, but don't have the exact details. Nevertheless, let me try and see if tests pass.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the tests will pass.. but I realize what you are saying.. the user could be running on a higher spark version say and we would be bundling 2.4.4 . Lets just open a JIRA to tackle this usability issue and keep it as -is now.. We can document the need for --packages ... when using spark-submit or spark-shell clearly for now.. and move on.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that can be one of the problems. Created a JIRA for this issue: https://issues.apache.org/jira/browse/HUDI-516

About documentation of --packages are you guys going to take care of that ?

</includes>
</artifactSet>
<relocations>
Expand Down Expand Up @@ -139,10 +137,6 @@
<pattern>org.apache.commons.codec.</pattern>
<shadedPattern>org.apache.hudi.org.apache.commons.codec.</shadedPattern>
</relocation>
<relocation>
<pattern>com.databricks.</pattern>
<shadedPattern>org.apache.hudi.com.databricks.</shadedPattern>
</relocation>
<!-- TODO: Revisit GH ISSUE #533 & PR#633-->
</relocations>
<filters>
Expand Down
Loading