-
Notifications
You must be signed in to change notification settings - Fork 4
/
NJDataTypeF.scala
198 lines (155 loc) · 7.52 KB
/
NJDataTypeF.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package com.github.chenharryhua.nanjin.spark
import cats.Functor
import com.github.chenharryhua.nanjin.common.utils.random4d
import higherkindness.droste.data.Fix
import higherkindness.droste.{scheme, Algebra, Coalgebra}
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.spark.sql.types.*
private object KeyMustBeStringException extends Exception("key must be String")
sealed private[spark] trait NJDataTypeF[A]
private[spark] object NJDataTypeF {
// numeric types
final private case class NJByteType[K]() extends NJDataTypeF[K]
final private case class NJShortType[K]() extends NJDataTypeF[K]
final private case class NJIntegerType[K]() extends NJDataTypeF[K]
final private case class NJLongType[K]() extends NJDataTypeF[K]
final private case class NJFloatType[K]() extends NJDataTypeF[K]
final private case class NJDoubleType[K]() extends NJDataTypeF[K]
final private case class NJDecimalType[K](precision: Int, scale: Int) extends NJDataTypeF[K]
final private case class NJStringType[K]() extends NJDataTypeF[K]
final private case class NJBinaryType[K]() extends NJDataTypeF[K]
final private case class NJBooleanType[K]() extends NJDataTypeF[K]
final private case class NJTimestampType[K]() extends NJDataTypeF[K]
final private case class NJDateType[K]() extends NJDataTypeF[K]
final private case class NJArrayType[K](containsNull: Boolean, cont: K) extends NJDataTypeF[K]
final private case class NJMapType[K](key: NJDataType, value: NJDataType, containsNull: Boolean)
extends NJDataTypeF[K]
final private case class NJNullType[K]() extends NJDataTypeF[K]
final private case class NJStructType[K](className: String, namespace: String, fields: List[NJStructField])
extends NJDataTypeF[K]
final case class NJStructField(index: Int, colName: String, dataType: NJDataType, nullable: Boolean) {
private val dt: String = dataType.toCaseClass
val fieldStr: String =
s""" $colName\t\t\t\t\t\t\t:${if (nullable) s"Option[$dt]" else dt}"""
}
val algebra: Algebra[NJDataTypeF, DataType] = Algebra[NJDataTypeF, DataType] {
case NJByteType() => ByteType
case NJShortType() => ShortType
case NJIntegerType() => IntegerType
case NJLongType() => LongType
case NJFloatType() => FloatType
case NJDoubleType() => DoubleType
case NJStringType() => StringType
case NJBooleanType() => BooleanType
case NJBinaryType() => BinaryType
case NJTimestampType() => TimestampType
case NJDateType() => DateType
case NJDecimalType(p, s) => DecimalType(p, s)
case NJArrayType(c, dt) => ArrayType(dt, c)
case NJMapType(k, v, n) => MapType(k.toSpark, v.toSpark, n)
case NJStructType(_, _, fields) =>
StructType(fields.map(a => StructField(a.colName, a.dataType.toSpark, a.nullable)))
case NJNullType() => NullType
}
val stringAlgebra: Algebra[NJDataTypeF, String] = Algebra[NJDataTypeF, String] {
case NJByteType() => "Byte"
case NJShortType() => "Short"
case NJIntegerType() => "Int"
case NJLongType() => "Long"
case NJFloatType() => "Float"
case NJDoubleType() => "Double"
case NJStringType() => "String"
case NJBooleanType() => "Boolean"
case NJBinaryType() => "Array[Byte]"
case NJTimestampType() => "java.sql.Timestamp"
case NJDateType() => "java.sql.Date"
case NJDecimalType(p, s) => s"BigDecimal($p,$s)"
case NJArrayType(_, dt) => s"Array[$dt]"
case NJMapType(k, v, n) =>
val vstr = if (n) s"Option[${v.toCaseClass}]" else v.toCaseClass
s"Map[${k.toCaseClass}, $vstr]"
case NJStructType(cn, ns, fields) =>
s"""
|final case class $ns.$cn (
|${fields.map(_.fieldStr).mkString(",\n")}
|)
|""".stripMargin
case NJNullType() => "FixMe-NullTypeInferred"
}
private val nullSchema: Schema = Schema.create(Schema.Type.NULL)
private def unionNull(nullable: Boolean, sm: Schema): Schema =
if (nullable) Schema.createUnion(sm, nullSchema) else sm
/** [[org.apache.spark.sql.avro.SchemaConverters]] translate decimal to avro fixed type which was not
* supported by avro-hugger yet
*/
def schemaAlgebra(builder: SchemaBuilder.TypeBuilder[Schema]): Algebra[NJDataTypeF, Schema] =
Algebra[NJDataTypeF, Schema] {
case NJByteType() => builder.intType()
case NJShortType() => builder.intType()
case NJIntegerType() => builder.intType()
case NJLongType() => builder.longType()
case NJFloatType() => builder.floatType()
case NJDoubleType() => builder.doubleType()
case NJStringType() => builder.stringType()
case NJBooleanType() => builder.booleanType()
case NJBinaryType() => builder.bytesType()
case NJTimestampType() => LogicalTypes.timestampMillis().addToSchema(builder.longType())
case NJDateType() => LogicalTypes.date().addToSchema(builder.intType())
case NJDecimalType(p, s) => LogicalTypes.decimal(p, s).addToSchema(builder.bytesType())
case NJArrayType(containsNull, sm) =>
builder.array().items(unionNull(containsNull, sm))
case NJMapType(NJDataType(NJStringType()), v, n) =>
builder.map().values(unionNull(n, v.toSchema(builder)))
case NJMapType(_, _, _) => throw KeyMustBeStringException
case NJStructType(cn, ns, fields) =>
val fieldsAssembler = SchemaBuilder.builder(ns).record(cn).fields()
fields.foreach { fs =>
val dts = fs.dataType.toSchema(SchemaBuilder.builder())
val schema = unionNull(fs.nullable, dts)
fieldsAssembler.name(fs.colName).`type`(schema).noDefault()
}
fieldsAssembler.endRecord()
case NJNullType() => nullSchema
}
@SuppressWarnings(Array("SuspiciousMatchOnClassObject"))
val coalgebra: Coalgebra[NJDataTypeF, DataType] =
Coalgebra[NJDataTypeF, DataType] {
case ByteType => NJByteType()
case ShortType => NJShortType()
case IntegerType => NJIntegerType()
case LongType => NJLongType()
case FloatType => NJFloatType()
case DoubleType => NJDoubleType()
case dt: DecimalType => NJDecimalType(dt.precision, dt.scale)
case BooleanType => NJBooleanType()
case BinaryType => NJBinaryType()
case StringType => NJStringType()
case TimestampType => NJTimestampType()
case DateType => NJDateType()
case ArrayType(dt, c) => NJArrayType(c, dt)
case MapType(k, v, n) => NJMapType(NJDataType(k), NJDataType(v), n)
case StructType(fields) =>
NJStructType(
s"FixMe${random4d.value}",
"nj.spark",
fields.toList.zipWithIndex.map { case (st, idx) =>
NJStructField(idx, st.name, NJDataType(st.dataType), st.nullable)
}
)
case NullType => NJNullType()
case unknown => sys.error(s"unknown type ${unknown.toString}")
}
implicit val functorNJDataTypeF: Functor[NJDataTypeF] =
cats.derived.semiauto.functor[NJDataTypeF]
}
final case class NJDataType(value: Fix[NJDataTypeF]) extends AnyVal {
def toSpark: DataType = scheme.cata(NJDataTypeF.algebra).apply(value)
def toSchema(builder: SchemaBuilder.TypeBuilder[Schema]): Schema =
scheme.cata(NJDataTypeF.schemaAlgebra(builder)).apply(value)
def toSchema: Schema = toSchema(SchemaBuilder.builder())
def toCaseClass: String = scheme.cata(NJDataTypeF.stringAlgebra).apply(value)
}
object NJDataType {
def apply(spark: DataType): NJDataType =
NJDataType(scheme.ana(NJDataTypeF.coalgebra).apply(spark))
}