Skip to content
This repository has been archived by the owner on Nov 20, 2019. It is now read-only.

Commit

Permalink
[CROSSDATA-808] Fix Map<Timestamp, String> serialization (#760)
Browse files Browse the repository at this point in the history
* fix 'MapType' serialization: serialize any kind of key supported by spark appart from string

* Merge Fix
  • Loading branch information
darroyo-stratio authored and mafernandez-stratio committed Oct 31, 2016
1 parent 274ff66 commit 9f2e3a1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case (ArrayType(ty, _), JArray(arr)) =>
mutable.WrappedArray make arr.map(extractField(ty, _)).toArray
/* Maps will be serialized as sub-objects so keys are constrained to be strings */
case (MapType(StringType, vt, _), JObject(fields)) =>
val (keys, values) = fields.unzip
val unserValues = values map (jval => extractField(vt, jval))
ArrayBasedMapDataNotDeprecated(keys.toArray, unserValues.toArray)
case (MapType(kt, vt, _), JObject(JField("map", JObject(JField("keys", JArray(mapKeys)) :: JField("values", JArray(mapValues)) :: _) ) :: _)) =>
val unserKeys = mapKeys map (jval => extractField(kt, jval))
val unserValues = mapValues map (jval => extractField(vt, jval))
ArrayBasedMapDataNotDeprecated(unserKeys.toArray, unserValues.toArray)
case (st: StructType, JObject(JField("values",JArray(values))::_)) =>
deserializeWithSchema(st, values, true)
}
Expand Down Expand Up @@ -106,14 +106,15 @@ case class RowSerializer(providedSchema: StructType) extends Serializer[Row] {
case v: ArrayDataNotDeprecated => JArray(v.array.toList.map(v => Extraction.decompose(v)))
case v: mutable.WrappedArray[_] => JArray(v.toList.map(v => Extraction.decompose(v)))
}
case (MapType(StringType, vt, _), v: MapDataNotDeprecated) =>
case (MapType(kt, vt, _), v: MapDataNotDeprecated) =>
/* Maps will be serialized as sub-objects so keys are constrained to be strings */
val serKeys = v.keyArray().array.map(v => v.toString)
val serKeys = v.keyArray().array.map(v => serializeField(kt -> v))
val serValues = v.valueArray.array.map(v => serializeField(vt -> v))
JObject(
(v.keyArray.array zip serValues) map {
case (k: String, v) => JField(k, v)
} toList
JField("map",
JObject(
JField("keys", JArray(serKeys.toList)),
JField("values", JArray(serValues.toList))
)
)
case (st: StructType, v: Row) => serializeWithSchema(st, v, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.stratio.crossdata.common.serializers.XDSerializationTest.TestCase
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

Expand Down Expand Up @@ -47,6 +48,7 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
StructField("arraystring",ArrayType(StringType,true),true),
StructField("mapstringint",MapType(StringType,IntegerType,true),true),
StructField("mapstringstring",MapType(StringType,StringType,true),true),
StructField("maptimestampinteger",MapType(TimestampType,IntegerType,true),true),
StructField("struct",StructType(StructField("field1",IntegerType,true)::StructField("field2",IntegerType,true) ::Nil), true),
StructField("arraystruct",ArrayType(StructType(StructField("field1",IntegerType,true)::StructField("field2", IntegerType,true)::Nil),true),true),
StructField("structofstruct",StructType(StructField("field1",TimestampType,true)::StructField("field2", IntegerType, true)::StructField("struct1",StructType(StructField("structField1",StringType,true)::StructField("structField2",IntegerType,true)::Nil),true)::Nil),true)
Expand All @@ -72,6 +74,7 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer
WrappedArray make Array("hello", "world"),
ArrayBasedMapData(Map("b" -> 2)),
ArrayBasedMapData(Map("a" -> "A", "b" -> "B")),
ArrayBasedMapData(Map(java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 25, java.sql.Timestamp.valueOf("2015-11-30 10:00:00.0") -> 12)),
new GenericRowWithSchema(Array(99,98), StructType(StructField("field1", IntegerType)
::StructField("field2", IntegerType)::Nil)),
WrappedArray make Array(
Expand Down Expand Up @@ -102,11 +105,11 @@ class RowSerializerSpec extends XDSerializationTest[Row] with CrossdataCommonSer


implicit val formats = json4sJacksonFormats + new RowSerializer(schema)


override def testCases: Seq[TestCase] = Seq(
TestCase("marshall & unmarshall a row with no schema", rowWithNoSchema),
TestCase("marshall & unmarshall a row with schema", rowWithSchema)
)

}
}

0 comments on commit 9f2e3a1

Please sign in to comment.