Skip to content

Commit

Permalink
Fixing _rawBody handling and adding option to maintain _ts and _etag …
Browse files Browse the repository at this point in the history
…properties from origin (#26820)

* Fixing _rawBody handling and adding option to maintain _ts and _etag properties from origin

* Reacted to CR feedback
  • Loading branch information
FabianMeiswinkel committed Feb 2, 2022
1 parent a02a06c commit 44f50b8
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,39 +100,72 @@ private class CosmosRowConverter(

def fromRowToObjectNode(row: Row): ObjectNode = {

if (row.schema.contains(StructField(CosmosTableSchemaInferrer.RawJsonBodyAttributeName, StringType))){
// Special case when the reader read the rawJson
val rawJson = row.getAs[String](CosmosTableSchemaInferrer.RawJsonBodyAttributeName)
objectMapper.readTree(rawJson).asInstanceOf[ObjectNode]
}
else {
val objectNode: ObjectNode = objectMapper.createObjectNode()
row.schema.fields.zipWithIndex.foreach({
case (field, i) =>
field.dataType match {
case _: NullType => putNullConditionally(objectNode, field.name)
case _ if row.isNullAt(i) => putNullConditionally(objectNode, field.name)
case _ =>
val nodeOpt = convertSparkDataTypeToJsonNode(field.dataType, row.get(i))
if (nodeOpt.isDefined) {
objectNode.set(field.name, nodeOpt.get)
}
val rawBodyFieldName = if (row.schema.names.contains(CosmosTableSchemaInferrer.RawJsonBodyAttributeName) &&
row.schema.apply(CosmosTableSchemaInferrer.RawJsonBodyAttributeName).dataType.isInstanceOf[StringType]) {
Some(CosmosTableSchemaInferrer.RawJsonBodyAttributeName)
} else if (row.schema.names.contains(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName) &&
row.schema.apply(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName).dataType.isInstanceOf[StringType]) {
Some(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName)
} else {
None
}

if (rawBodyFieldName.isDefined){
// Special case when the reader read the rawJson
val rawJson = row.getAs[String](rawBodyFieldName.get)
convertRawBodyJsonToObjectNode(rawJson, rawBodyFieldName.get)
} else {
val objectNode: ObjectNode = objectMapper.createObjectNode()
row.schema.fields.zipWithIndex.foreach({
case (field, i) =>
field.dataType match {
case _: NullType => putNullConditionally(objectNode, field.name)
case _ if row.isNullAt(i) => putNullConditionally(objectNode, field.name)
case _ =>
val nodeOpt = convertSparkDataTypeToJsonNode(field.dataType, row.get(i))
if (nodeOpt.isDefined) {
objectNode.set(field.name, nodeOpt.get)
}
})
}
})

objectNode
}
objectNode
}
}

private def convertRawBodyJsonToObjectNode(json: String, rawBodyFieldName: String): ObjectNode = {
val doc = objectMapper.readTree(json).asInstanceOf[ObjectNode]

if (rawBodyFieldName == CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName) {
doc.set(
CosmosTableSchemaInferrer.OriginETagAttributeName,
doc.get(CosmosTableSchemaInferrer.ETagAttributeName))
doc.set(
CosmosTableSchemaInferrer.OriginTimestampAttributeName,
doc.get(CosmosTableSchemaInferrer.TimestampAttributeName))
}

doc
}

def fromInternalRowToObjectNode(row: InternalRow, schema: StructType): ObjectNode = {
if (schema.contains(StructField(CosmosTableSchemaInferrer.RawJsonBodyAttributeName, StringType))){
val rawBodyFieldIndex = schema.fieldIndex(CosmosTableSchemaInferrer.RawJsonBodyAttributeName)

val rawBodyFieldName = if (schema.names.contains(CosmosTableSchemaInferrer.RawJsonBodyAttributeName) &&
schema.apply(CosmosTableSchemaInferrer.RawJsonBodyAttributeName).dataType.isInstanceOf[StringType]) {
Some(CosmosTableSchemaInferrer.RawJsonBodyAttributeName)
} else if (schema.names.contains(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName) &&
schema.apply(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName).dataType.isInstanceOf[StringType]) {
Some(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName)
} else {
None
}

if (rawBodyFieldName.isDefined){
val rawBodyFieldIndex = schema.fieldIndex(rawBodyFieldName.get)
// Special case when the reader read the rawJson
val rawJson = convertRowDataToString(row.get(rawBodyFieldIndex, StringType))
objectMapper.readTree(rawJson).asInstanceOf[ObjectNode]
}
else
{
convertRawBodyJsonToObjectNode(rawJson, rawBodyFieldName.get)
} else {
val objectNode: ObjectNode = objectMapper.createObjectNode()
schema.fields.zipWithIndex.foreach({
case (field, i) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ private object CosmosTableSchemaInferrer
extends BasicLoggingTrait {

private[spark] val RawJsonBodyAttributeName = "_rawBody"
private[spark] val OriginRawJsonBodyAttributeName = "_origin_rawBody"
private[spark] val TimestampAttributeName = "_ts"
private[spark] val OriginTimestampAttributeName = "_origin_ts"
private[spark] val IdAttributeName = "id"
private[spark] val ETagAttributeName = "_etag"
private[spark] val OriginETagAttributeName = "_origin_etag"
private[spark] val SelfAttributeName = "_self"
private[spark] val ResourceIdAttributeName = "_rid"
private[spark] val AttachmentsAttributeName = "_attachments"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, Uuid}

import java.sql.{Date, Timestamp}
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, OffsetDateTime, ZoneOffset}
import java.util.UUID
import scala.util.Random

// scalastyle:off underscore.import
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -669,6 +671,54 @@ class CosmosRowConverterSpec extends UnitSpec with BasicLoggingTrait {
objectNode.get(colName2).asText shouldEqual colVal2
}

"originRawJson in spark row" should "translate to ObjectNode" in {
val colName1 = "testCol1"
val colName2 = "testCol2"
val colVal1 = 8
val colVal2 = "strVal"
val ts = Random.nextInt(100000)
val etag = UUID.randomUUID().toString
val sourceObjectNode: ObjectNode = objectMapper.createObjectNode()
sourceObjectNode.put(colName1, colVal1)
sourceObjectNode.put(colName2, colVal2)
sourceObjectNode.put(CosmosTableSchemaInferrer.TimestampAttributeName, ts)
sourceObjectNode.put(CosmosTableSchemaInferrer.ETagAttributeName, etag)

val row = new GenericRowWithSchema(
Array(sourceObjectNode.toString),
StructType(Seq(StructField(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName, StringType))))

val objectNode = defaultRowConverter.fromRowToObjectNode(row)
objectNode.get(colName1).asInt shouldEqual colVal1
objectNode.get(colName2).asText shouldEqual colVal2
objectNode.get(CosmosTableSchemaInferrer.OriginTimestampAttributeName).asInt shouldEqual ts
objectNode.get(CosmosTableSchemaInferrer.OriginETagAttributeName).asText shouldEqual etag
}

"originRawJson in spark InternalRow" should "translate to ObjectNode" in {
val colName1 = "testCol1"
val colName2 = "testCol2"
val colVal1 = 8
val colVal2 = "strVal"
val ts = Random.nextInt(100000)
val etag = UUID.randomUUID().toString
val sourceObjectNode: ObjectNode = objectMapper.createObjectNode()
sourceObjectNode.put(colName1, colVal1)
sourceObjectNode.put(colName2, colVal2)
sourceObjectNode.put(CosmosTableSchemaInferrer.TimestampAttributeName, ts)
sourceObjectNode.put(CosmosTableSchemaInferrer.ETagAttributeName, etag)

val row = InternalRow(sourceObjectNode.toString)

val objectNode = defaultRowConverter.fromInternalRowToObjectNode(
row,
StructType(Seq(StructField(CosmosTableSchemaInferrer.OriginRawJsonBodyAttributeName, StringType))))
objectNode.get(colName1).asInt shouldEqual colVal1
objectNode.get(colName2).asText shouldEqual colVal2
objectNode.get(CosmosTableSchemaInferrer.OriginTimestampAttributeName).asInt shouldEqual ts
objectNode.get(CosmosTableSchemaInferrer.OriginETagAttributeName).asText shouldEqual etag
}

"basic ObjectNode" should "translate to Row" in {
val colName1 = "testCol1"
val colName2 = "testCol2"
Expand Down

0 comments on commit 44f50b8

Please sign in to comment.