From 915ebe88294b256334b73aed9726812b23ab25c7 Mon Sep 17 00:00:00 2001 From: rJabong Date: Mon, 20 Jun 2016 12:29:13 +0530 Subject: [PATCH 1/2] Replace NullType to StringType in a DataFrame Schema, then we can able to write DataFrame in a Parquet Format --- .../apache/spark/sql/schema/SchemaUtils.scala | 66 +++++++++++++++++++ .../spark/sql/schema/TestSchemaUtils.scala | 20 ++++++ 2 files changed, 86 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/schema/TestSchemaUtils.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala new file mode 100644 index 0000000000000..44b0efa4472dc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala @@ -0,0 +1,66 @@ +package org.apache.spark.sql.schema + +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ + +object SchemaUtils { + + /** + *This method will replace NullType to StringType in a input dataFrame schema + * @param dataFrame + * @param sqlContext + * @return + */ + def replaceNullTypeToStringType(dataFrame: DataFrame, sqlContext: SQLContext): DataFrame = { + require(dataFrame != null, "dataFrame cannot be null") + require(sqlContext != null, "sqlContext cannot be null") + + val schema = getStructType(dataFrame.schema) + + val df = sqlContext.createDataFrame(dataFrame.rdd, schema) + + df + } + + /** + *This method will replace NullType to StringType in a input schema + * @param st + * @return + */ + def getStructType(st: StructType): StructType = { + require(st != null, "StructType cannot be null") + + val fields = st.fields.toList + var fieldsNew = List[StructField]() + var i = 0 + + fields.foreach{ + e => { + fieldsNew = fieldsNew ::: List(StructField(e.name, getDataType(e.dataType), e.nullable, e.metadata)) + i = i + 1 + } + } + StructType(fieldsNew.toArray) + } + + /** + * + * @param dataType + * @return + */ + private def getDataType(dataType: Any): DataType = dataType match { + case at: ArrayType => { + ArrayType(getDataType(at.elementType), at.containsNull) + } + case st: StructType => { + getStructType(st.asInstanceOf[StructType]) + } + case elem => { + elem.asInstanceOf[DataType] match { + case NullType => StringType + case elem => elem + } + } + } +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/schema/TestSchemaUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/schema/TestSchemaUtils.scala new file mode 100644 index 0000000000000..4bd0204e11110 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/schema/TestSchemaUtils.scala @@ -0,0 +1,20 @@ +package org.apache.spark.sql.schema + +import org.apache.spark.sql.types._ +import org.scalatest.{Matchers, FlatSpec} + +class TestSchemaUtils extends FlatSpec with Matchers with Serializable{ + +"Test-1: getStructType" should "return testSchema2" in { + val testSchema1 = StructType(Array(StructField("value", ArrayType(StructType(Array(StructField("seqId",IntegerType,true), StructField("value",NullType,true))),false),true))) + val testSchema2 = StructType(Array(StructField("value", ArrayType(StructType(Array(StructField("seqId",IntegerType,true), StructField("value",StringType,true))),false),true))) + SchemaUtils.getStructType(testSchema1) should be (testSchema2) +} + +"Test-2: getStructType" should "return testSchema2" in { + val testSchema1 = StructType(Array(StructField("additionalStrap",StructType(Array(StructField("seqId",IntegerType,true), StructField("isGlobal",BooleanType,true), StructField("label",NullType,true), StructField("name",StringType,true))),true))) + val testSchema2 = StructType(Array(StructField("additionalStrap",StructType(Array(StructField("seqId",IntegerType,true), StructField("isGlobal",BooleanType,true), StructField("label",StringType,true), StructField("name",StringType,true))),true))) + SchemaUtils.getStructType(testSchema1) should be (testSchema2) +} + +} From 195b24d1e7f3a68e99fd27a1781c6f2298ae6706 Mon Sep 17 00:00:00 2001 From: rJabong Date: Tue, 21 Jun 2016 02:00:09 +0530 Subject: [PATCH 2/2] Replace NullType to StringType in a DataFrame Schema, then we can able to write DataFrame in a Parquet Format --- .../main/scala/org/apache/spark/sql/schema/SchemaUtils.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala index 44b0efa4472dc..324cf668eaa01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/schema/SchemaUtils.scala @@ -49,6 +49,9 @@ object SchemaUtils { * @return */ private def getDataType(dataType: Any): DataType = dataType match { + case mt: MapType => { + MapType(getDataType(mt.keyType), getDataType(mt.valueType), mt.valueContainsNull) + } case at: ArrayType => { ArrayType(getDataType(at.elementType), at.containsNull) }