Skip to content

Commit

Permalink
Fix avro serialization: reuse generated schema in advance (#52)
Browse files Browse the repository at this point in the history
* Reuse generated avro schema in advance

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* format

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 5, 2021
1 parent c768f12 commit 927ffbe
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ class BigTableSinkRelation(
val featureFields = data.schema.fields
.filterNot(f => isSystemColumn(f.name))

val featureColumns = featureFields.map(f => col(f.name))
val featureColumns = featureFields.map(f => data(f.name))

val entityColumns = config.entityColumns.map(c => col(c).cast(StringType))
val schemaReference = serializer.schemaReference(StructType(featureFields))
val entityColumns = config.entityColumns.map(c => data(c).cast(StringType))
val schema = serializer.convertSchema(StructType(featureFields))
val schemaReference = serializer.schemaReference(schema)

data
.select(
joinEntityKey(struct(entityColumns: _*)).alias("key"),
serializer.serializeData(struct(featureColumns: _*)).alias("value"),
serializer.serializeData(schema)(struct(featureColumns: _*)).alias("value"),
col(config.timestampColumn).alias("ts")
)
.where(length(col("key")) > 0)
Expand All @@ -117,12 +118,12 @@ class BigTableSinkRelation(
.filterNot(f => isSystemColumn(f.name))
val featureSchema = StructType(featureFields)

val key = schemaKeyPrefix.getBytes ++ serializer.schemaReference(featureSchema)
val serializedSchema = serializer.serializeSchema(featureSchema).getBytes
val schema = serializer.convertSchema(featureSchema)
val key = schemaKeyPrefix.getBytes ++ serializer.schemaReference(schema)

val put = new Put(key)
val qualifier = "avro".getBytes
put.addColumn(metadataColumnFamily.getBytes, qualifier, serializedSchema)
put.addColumn(metadataColumnFamily.getBytes, qualifier, schema.asInstanceOf[String].getBytes)

val btConn = BigtableConfiguration.connect(hadoopConfig)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ import org.apache.spark.sql.avro.functions.to_avro
import org.apache.spark.sql.types.StructType

class AvroSerializer extends Serializer {
def serializeSchema(schema: StructType): String = {
override type SchemaType = String

def convertSchema(schema: StructType): String = {
val avroSchema = SchemaConverters.toAvroType(schema)
avroSchema.toString
}

def schemaReference(schema: StructType): Array[Byte] = {
Hashing.murmur3_32().hashBytes(serializeSchema(schema).getBytes).asBytes()
def schemaReference(schema: String): Array[Byte] = {
Hashing.murmur3_32().hashBytes(schema.getBytes).asBytes()
}

def serializeData: Column => Column = to_avro
def serializeData(schema: String): Column => Column = to_avro(_, schema)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.types.StructType

trait Serializer {
def serializeSchema(schema: StructType): String
type SchemaType

def schemaReference(schema: StructType): Array[Byte]
def convertSchema(schema: StructType): SchemaType

def serializeData: Column => Column
def schemaReference(schema: SchemaType): Array[Byte]

def serializeData(schema: SchemaType): Column => Column
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ class RowValidator(featureTable: FeatureTable, timestampColumn: String) extends
col(timestampColumn).isNotNull

def allChecks: Column =
allEntitiesPresent && atLeastOneFeatureNotNull && timestampPresent
allEntitiesPresent && timestampPresent
}

0 comments on commit 927ffbe

Please sign in to comment.