Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3569][SQL] Add metadata field to StructField #2701

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c194d5e
add metadata field to StructField and Attribute
mengxr Sep 17, 2014
367d237
add test
mengxr Oct 7, 2014
d65072e
remove Map.empty
mengxr Oct 7, 2014
67fdebb
add test on join
mengxr Oct 7, 2014
d8af0ed
move tests to SQLQuerySuite
mengxr Oct 8, 2014
c41a664
merge master
mengxr Oct 8, 2014
7e5a322
do not output metadata in StructField.toString
mengxr Oct 8, 2014
61b8e0f
merge master
mengxr Oct 9, 2014
618e349
make tests work in scala
mengxr Oct 9, 2014
905bb89
java conversions
mengxr Oct 9, 2014
93518fb
support metadata in python
mengxr Oct 9, 2014
e42c452
merge master
mengxr Oct 13, 2014
60614c7
add metadata
mengxr Oct 14, 2014
60cc131
add doc and header
mengxr Oct 14, 2014
1fcbf13
change metadata type in StructField for Scala/Java
mengxr Oct 14, 2014
c9d7301
organize imports
mengxr Oct 14, 2014
473a7c5
merge master
mengxr Oct 14, 2014
24a9f80
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 15, 2014
3f49aab
remove StructField.toString
mengxr Oct 15, 2014
4266f4d
add StructField.toString back for backward compatibility
mengxr Oct 15, 2014
a438440
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 16, 2014
ddfcfad
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 21, 2014
611d3c2
move metadata from Expr to NamedExpr
mengxr Oct 21, 2014
1e2abcf
change default value of metadata to None in python
mengxr Oct 23, 2014
589f314
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Oct 23, 2014
886b85c
Expose Metadata and MetadataBuilder through the public scala and java…
marmbrus Oct 29, 2014
c35203f
Merge pull request #1 from marmbrus/pr/2701
mengxr Oct 30, 2014
5ef930a
Merge remote-tracking branch 'apache/master' into structfield-metadata
mengxr Nov 1, 2014
dedda56
merge remote
mengxr Nov 1, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,15 @@ class StructField(DataType):

"""

def __init__(self, name, dataType, nullable):
def __init__(self, name, dataType, nullable, metadata=None):
"""Creates a StructField
:param name: the name of this field.
:param dataType: the data type of this field.
:param nullable: indicates whether values of this field
can be null.
:param metadata: metadata of this field, which is a map from string
to simple type that can be serialized to JSON
automatically

>>> (StructField("f1", StringType, True)
... == StructField("f1", StringType, True))
Expand All @@ -330,6 +333,7 @@ def __init__(self, name, dataType, nullable):
self.name = name
self.dataType = dataType
self.nullable = nullable
self.metadata = metadata or {}

def __repr__(self):
return "StructField(%s,%s,%s)" % (self.name, self.dataType,
Expand All @@ -338,13 +342,15 @@ def __repr__(self):
def jsonValue(self):
return {"name": self.name,
"type": self.dataType.jsonValue(),
"nullable": self.nullable}
"nullable": self.nullable,
"metadata": self.metadata}

@classmethod
def fromJson(cls, json):
return StructField(json["name"],
_parse_datatype_json_value(json["type"]),
json["nullable"])
json["nullable"],
json["metadata"])


class StructType(DataType):
Expand Down Expand Up @@ -423,7 +429,8 @@ def _parse_datatype_json_string(json_string):
... StructField("simpleArray", simple_arraytype, True),
... StructField("simpleMap", simple_maptype, True),
... StructField("simpleStruct", simple_structtype, True),
... StructField("boolean", BooleanType(), False)])
... StructField("boolean", BooleanType(), False),
... StructField("withMeta", DoubleType(), False, {"name": "age"})])
>>> check_datatype(complex_structtype)
True
>>> # Complex ArrayType.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object ScalaReflection {
/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case Schema(s: StructType, _) =>
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
}

/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.{DataType, FractionalType, IntegralType, NumericType, NativeType}
import org.apache.spark.sql.catalyst.util.Metadata

abstract class Expression extends TreeNode[Expression] {
self: Product =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class Generator extends Expression {
override type EvaluatedType = TraversableOnce[Row]

override lazy val dataType =
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable))))
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))))

override def nullable = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.util.Metadata

object NamedExpression {
private val curId = new java.util.concurrent.atomic.AtomicLong()
Expand All @@ -43,6 +44,9 @@ abstract class NamedExpression extends Expression {

def toAttribute: Attribute

/** Returns the metadata when an expression is a reference to another expression with metadata. */
def metadata: Metadata = Metadata.empty

protected def typeSuffix =
if (resolved) {
dataType match {
Expand Down Expand Up @@ -88,10 +92,16 @@ case class Alias(child: Expression, name: String)

override def dataType = child.dataType
override def nullable = child.nullable
override def metadata: Metadata = {
child match {
case named: NamedExpression => named.metadata
case _ => Metadata.empty
}
}

override def toAttribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifiers)
} else {
UnresolvedAttribute(name)
}
Expand All @@ -108,15 +118,20 @@ case class Alias(child: Expression, name: String)
* @param name The name of this attribute, should only be used during analysis or for debugging.
* @param dataType The [[DataType]] of this attribute.
* @param nullable True if null is a valid value for this attribute.
* @param metadata The metadata of this attribute.
* @param exprId A globally unique id used to check if different AttributeReferences refer to the
* same attribute.
* @param qualifiers a list of strings that can be used to referred to this attribute in a fully
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
*/
case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true)
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends Attribute with trees.LeafNode[Expression] {
case class AttributeReference(
name: String,
dataType: DataType,
nullable: Boolean = true,
override val metadata: Metadata = Metadata.empty)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {

override def equals(other: Any) = other match {
case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
Expand All @@ -128,10 +143,12 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
var h = 17
h = h * 37 + exprId.hashCode()
h = h * 37 + dataType.hashCode()
h = h * 37 + metadata.hashCode()
h
}

override def newInstance() = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
override def newInstance() =
AttributeReference(name, dataType, nullable, metadata)(qualifiers = qualifiers)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -140,7 +157,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (nullable == newNullability) {
this
} else {
AttributeReference(name, dataType, newNullability)(exprId, qualifiers)
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers)
}
}

Expand All @@ -159,7 +176,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (newQualifiers.toSet == qualifiers.toSet) {
this
} else {
AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
import scala.util.parsing.combinator.RegexParsers

import org.json4s.JsonAST.JValue
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.sql.catalyst.ScalaReflectionLock
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.util.Metadata
import org.apache.spark.util.Utils


object DataType {
def fromJson(json: String): DataType = parseDataType(parse(json))

Expand Down Expand Up @@ -70,10 +70,11 @@ object DataType {

private def parseStructField(json: JValue): StructField = json match {
case JSortedObject(
("metadata", metadata: JObject),
("name", JString(name)),
("nullable", JBool(nullable)),
("type", dataType: JValue)) =>
StructField(name, parseDataType(dataType), nullable)
StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
}

@deprecated("Use DataType.fromJson instead", "1.2.0")
Expand Down Expand Up @@ -388,24 +389,34 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
* @param name The name of this field.
* @param dataType The data type of this field.
* @param nullable Indicates if values of this field can be `null` values.
* @param metadata The metadata of this field. The metadata should be preserved during
* transformation if the content of the column is not modified, e.g, in selection.
*/
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean,
metadata: Metadata = Metadata.empty) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
DataType.buildFormattedString(dataType, s"$prefix |", builder)
}

// override the default toString to be compatible with legacy parquet files.
override def toString: String = s"StructField($name,$dataType,$nullable)"

private[sql] def jsonValue: JValue = {
("name" -> name) ~
("type" -> dataType.jsonValue) ~
("nullable" -> nullable)
("nullable" -> nullable) ~
("metadata" -> metadata.jsonValue)
}
}

object StructType {
protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}

case class StructType(fields: Seq[StructField]) extends DataType {
Expand Down Expand Up @@ -439,7 +450,7 @@ case class StructType(fields: Seq[StructField]) extends DataType {
}

protected[sql] def toAttributes =
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())

def treeString: String = {
val builder = new StringBuilder
Expand Down
Loading