Skip to content

Commit

Permalink
starting fix on how we extract direct relation properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob-Eliat-Eliat committed Jun 27, 2024
1 parent 402485d commit 3b4ad4a
Showing 1 changed file with 66 additions and 58 deletions.
124 changes: 66 additions & 58 deletions src/main/scala/cognite/spark/v1/FlexibleDataModelRelationUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,26 @@ object FlexibleDataModelRelationUtils {
row: Row): Either[CdfSparkException, DirectRelationReference] =
extractDirectRelation("endNode", "Edge end node", schema, defaultSpace, row)

private def extractDirectRelationRefereceFromStruct(
propertyName: String,
descriptiveName: String,
defaultSpace: Option[String],
struct: Row): Either[Throwable, DirectRelationReference] = {
val space = Try(Option(struct.getAs[Any]("space")))
.orElse(Try(Option(struct.getAs[Any]("spaceExternalId")))) // just in case of fdm v2 utils are used
.getOrElse(None)
.orElse(defaultSpace)
val externalId = Option(struct.getAs[Any]("externalId"))
(space, externalId) match {
case (s, e) => Right(DirectRelationReference(space = String.valueOf(s), externalId = String.valueOf(e)))
case _ => Left(new CdfSparkException(s"""
|'$propertyName' ($descriptiveName) should be a 'StructType' with 'space' & 'externalId' properties
|in data row: ${rowToString(struct)}
|""".stripMargin
))
}
}

private def extractDirectRelation(
propertyName: String,
descriptiveName: String,
Expand All @@ -472,14 +492,11 @@ object FlexibleDataModelRelationUtils {
row: Row): Either[CdfSparkException, DirectRelationReference] =
Try {
val struct = row.getStruct(schema.fieldIndex(propertyName))
val space = Try(Option(struct.getAs[Any]("space")))
.orElse(Try(Option(struct.getAs[Any]("spaceExternalId")))) // just in case of fdm v2 utils are used
.getOrElse(None)
.orElse(defaultSpace)
val externalId = Option(struct.getAs[Any]("externalId"))
Apply[Option].map2(space, externalId) {
case (s, e) => DirectRelationReference(space = String.valueOf(s), externalId = String.valueOf(e))
}
extractDirectRelationRefereceFromStruct(
propertyName,
descriptiveName,
defaultSpace,
struct)
} match {
case Success(Some(relation)) => Right(relation)
case Success(None) =>
Expand Down Expand Up @@ -615,15 +632,8 @@ object FlexibleDataModelRelationUtils {
val instancePropertyValueResult = propDef match {
case corePropDef: PropertyDefinition.ViewCorePropertyDefinition =>
corePropDef.`type` match {
case _: DirectNodeRelationProperty =>
directNodeRelationToInstancePropertyValue(
row,
schema,
propertyName,
corePropDef,
instanceSpace)
case t if t.isList => toInstancePropertyValueOfList(row, schema, propertyName, corePropDef)
case _ => toInstancePropertyValueOfNonList(row, schema, propertyName, corePropDef)
case t if t.isList => toInstancePropertyValueOfList(row, schema, propertyName, corePropDef, instanceSpace)
case _ => toInstancePropertyValueOfNonList(row, schema, propertyName, corePropDef, instanceSpace)
}
case _: PropertyDefinition.ConnectionDefinition =>
lookupFieldInRow(row, schema, propertyName, true) { _ =>
Expand Down Expand Up @@ -653,7 +663,7 @@ object FlexibleDataModelRelationUtils {

private val timezoneId: ZoneId = ZoneId.of("UTC")

private def lookupFieldInRow(row: Row, schema: StructType, propertyName: String, nullable: Boolean)(
private def lookupFieldInRow(row: Row, schema: StructType, propertyName: String, nullable: Boolean)(
get: => Int => Either[Throwable, InstancePropertyValue])
: Either[Throwable, OptionalField[InstancePropertyValue]] =
Try(schema.fieldIndex(propertyName)) match {
Expand All @@ -673,52 +683,59 @@ object FlexibleDataModelRelationUtils {
row: Row,
schema: StructType,
propertyName: String,
propDef: CorePropertyDefinition): Either[Throwable, OptionalField[InstancePropertyValue]] =
propDef: CorePropertyDefinition,
instanceSpace: Option[String]): Either[Throwable, OptionalField[InstancePropertyValue]] =
lookupFieldInRow(row, schema, propertyName, propDef.nullable.getOrElse(true)) { i =>
propDef.`type` match {
case p: TextProperty if p.isList =>
case p: DirectNodeRelationProperty =>
val relSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
InstancePropertyValue.ViewDirectNodeRelationList(relSeq.map( struct =>
extractDirectRelationRefereceFromStruct(instanceSpace, row)
)

case p: TextProperty =>
val strSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
Try(InstancePropertyValue.StringList(skipNulls(strSeq).map(String.valueOf))).toEither
case p @ PrimitiveProperty(PrimitivePropType.Boolean, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Boolean, _) =>
val boolSeq = Try(row.getSeq[Boolean](i)).getOrElse(row.getAs[Array[Boolean]](i).toSeq)
Try(InstancePropertyValue.BooleanList(boolSeq)).toEither
case p @ PrimitiveProperty(PrimitivePropType.Float32, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Float32, _) =>
val floatSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
tryAsFloatSeq(floatSeq, propertyName)
.map(InstancePropertyValue.Float32List)
case p @ PrimitiveProperty(PrimitivePropType.Float64, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Float64, _) =>
val doubleSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
tryAsDoubleSeq(doubleSeq, propertyName)
.map(InstancePropertyValue.Float64List)
case p @ PrimitiveProperty(PrimitivePropType.Int32, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Int32, _) =>
val intSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
tryAsIntSeq(intSeq, propertyName)
.map(InstancePropertyValue.Int32List)
case p @ PrimitiveProperty(PrimitivePropType.Int64, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Int64, _) =>
val longSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
tryAsLongSeq(longSeq, propertyName)
.map(InstancePropertyValue.Int64List)
case p @ PrimitiveProperty(PrimitivePropType.Timestamp, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Timestamp, _) =>
val tsSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
tryAsTimestamps(tsSeq, propertyName).map(InstancePropertyValue.TimestampList)
case p @ PrimitiveProperty(PrimitivePropType.Date, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Date, _) =>
val dateSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
tryAsDates(dateSeq, propertyName).map(InstancePropertyValue.DateList)
case p @ PrimitiveProperty(PrimitivePropType.Json, _) if p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Json, _) =>
val strSeq = Try(row.getSeq[String](i)).getOrElse(row.getAs[Array[String]](i).toSeq)
skipNulls(strSeq).toVector
.traverse(io.circe.parser.parse)
.map(InstancePropertyValue.ObjectList.apply)
.leftMap(e =>
new CdfSparkException(
s"Error parsing value of field '$propertyName' as a list of json objects: ${e.getMessage}"))
case p: TimeSeriesReference if p.isList =>
case p: TimeSeriesReference =>
val strSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
Try(InstancePropertyValue.TimeSeriesReferenceList(skipNulls(strSeq).map(String.valueOf))).toEither
case p: FileReference if p.isList =>
case p: FileReference =>
val strSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
Try(InstancePropertyValue.FileReferenceList(skipNulls(strSeq).map(String.valueOf))).toEither
case p: SequenceReference if p.isList =>
case p: SequenceReference =>
val strSeq = Try(row.getSeq[Any](i)).getOrElse(row.getAs[Array[Any]](i).toSeq)
Try(InstancePropertyValue.SequenceReferenceList(skipNulls(strSeq).map(String.valueOf))).toEither
case t => Left(new CdfSparkException(s"Unhandled list type: ${t.toString}"))
Expand All @@ -730,58 +747,49 @@ object FlexibleDataModelRelationUtils {
row: Row,
schema: StructType,
propertyName: String,
propDef: CorePropertyDefinition): Either[Throwable, OptionalField[InstancePropertyValue]] =
propDef: CorePropertyDefinition,
instanceSpace: Option[String]): Either[Throwable, OptionalField[InstancePropertyValue]] =
lookupFieldInRow(row, schema, propertyName, propDef.nullable.getOrElse(true)) { i =>
propDef.`type` match {
case p: TextProperty if !p.isList =>
case p: DirectNodeRelationProperty =>
extractDirectRelation(propertyName, "Direct Node Relation", schema, instanceSpace, row)
.map(_.asJson)
.map(InstancePropertyValue.Object)
case p: TextProperty =>
Try(InstancePropertyValue.String(String.valueOf(row.get(i)))).toEither
case p @ PrimitiveProperty(PrimitivePropType.Boolean, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Boolean, _) =>
Try(InstancePropertyValue.Boolean(row.getBoolean(i))).toEither
case p @ PrimitiveProperty(PrimitivePropType.Float32, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Float32, _) =>
tryAsFloat(row.get(i), propertyName).map(InstancePropertyValue.Float32)
case p @ PrimitiveProperty(PrimitivePropType.Float64, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Float64, _) =>
tryAsDouble(row.get(i), propertyName).map(InstancePropertyValue.Float64)
case p @ PrimitiveProperty(PrimitivePropType.Int32, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Int32, _) =>
tryAsInt(row.get(i), propertyName).map(InstancePropertyValue.Int32)
case p @ PrimitiveProperty(PrimitivePropType.Int64, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Int64, _) =>
tryAsLong(row.get(i), propertyName).map(InstancePropertyValue.Int64)
case p @ PrimitiveProperty(PrimitivePropType.Timestamp, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Timestamp, _) =>
tryAsTimestamp(row.get(i), propertyName).map(InstancePropertyValue.Timestamp.apply)
case p @ PrimitiveProperty(PrimitivePropType.Date, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Date, _) =>
tryAsDate(row.get(i), propertyName).map(InstancePropertyValue.Date.apply)
case p @ PrimitiveProperty(PrimitivePropType.Json, _) if !p.isList =>
case p @ PrimitiveProperty(PrimitivePropType.Json, _) =>
io.circe.parser
.parse(row
.getString(i))
.map(InstancePropertyValue.Object.apply)
.leftMap(e =>
new CdfSparkException(
s"Error parsing value of field '$propertyName' as a json object: ${e.getMessage}"))
case p: TimeSeriesReference if !p.isList =>
case p: TimeSeriesReference =>
Try(InstancePropertyValue.TimeSeriesReference(String.valueOf(row.get(i)))).toEither
case p: FileReference if !p.isList =>
case p: FileReference =>
Try(InstancePropertyValue.FileReference(String.valueOf(row.get(i)))).toEither
case p: SequenceReference if !p.isList =>
case p: SequenceReference =>
Try(InstancePropertyValue.SequenceReference(String.valueOf(row.get(i)))).toEither
case t => Left(new CdfSparkException(s"Unhandled non-list type: ${t.toString}"))
}
}
// scalastyle:on cyclomatic.complexity method.length

private def directNodeRelationToInstancePropertyValue(
row: Row,
schema: StructType,
propertyName: String,
propDef: CorePropertyDefinition,
defaultSpace: Option[String]): Either[Throwable, OptionalField[InstancePropertyValue]] = {
val nullable = propDef.nullable.getOrElse(true)
lookupFieldInRow(row, schema, propertyName, nullable) { _ =>
extractDirectRelation(propertyName, "Direct Node Relation", schema, defaultSpace, row)
.map(_.asJson)
.map(InstancePropertyValue.Object)
}
}

private def tryAsLong(n: Any, propertyName: String): Either[CdfSparkException, Long] = {
val nAsStr = String.valueOf(n)
val bd = BigDecimal(nAsStr)
Expand Down

0 comments on commit 3b4ad4a

Please sign in to comment.