diff --git a/docs/en/transform/nulltf.md b/docs/en/transform/nulltf.md new file mode 100644 index 00000000000..dad4ee945b0 --- /dev/null +++ b/docs/en/transform/nulltf.md @@ -0,0 +1,73 @@ +# Nulltf + +## Description + +set default value for null field + +:::tip + +This transform only supported by engine Spark. + +::: + +## Options + +| name | type | required | default value | +| ------------------- | ------- | -------- | ------------- | +| fields | array | no | - | + +### fields [list] + +A list of fields whose default value will be set. +The default value of the field can be set in the form of "field:value", If no set, the default value will be set according to the field type. + +## Examples + +the configuration + +```bash + nulltf { + fields { + name: "", + price: 0, + num: 100, + flag: false, + dt_timestamp: "2022-05-18 13:51:40.603", + dt_date: "2022-05-19" + } + } +``` + +before use nulltf transform + +```bash ++-----+-----+----+-----+--------------------+----------+ +| name|price| num| flag| dt_timestamp| dt_date| ++-----+-----+----+-----+--------------------+----------+ +|名称1| 22.5| 100|false|2022-05-20 14:34:...|2022-05-20| +| null| 22.5| 100|false|2022-05-20 14:35:...|2022-05-20| +|名称1| null| 100|false|2022-05-20 14:35:...|2022-05-20| +|名称1| 22.5|null|false|2022-05-20 14:36:...|2022-05-20| +|名称1| 22.5| 100| null|2022-05-20 14:36:...|2022-05-20| +|名称1| 22.5| 100|false| null|2022-05-20| +|名称1| 22.5| 100|false|2022-05-20 14:37:...| null| ++-----+-----+----+-----+--------------------+----------+ +``` + +after use nulltf transform + +```bash ++-----+-----+----+-----+--------------------+----------+ +| name|price| num| flag| dt_timestamp| dt_date| ++-----+-----+----+-----+--------------------+----------+ +|名称1| 22.5|100|false|2022-05-20 14:34:...|2022-05-20| +| | 22.5|100|false|2022-05-20 14:35:...|2022-05-20| +|名称1| 0.0|100|false|2022-05-20 14:35:...|2022-05-20| +|名称1| 22.5|100|false|2022-05-20 14:36:...|2022-05-20| +|名称1| 22.5|100|false|2022-05-20 14:36:...|2022-05-20| +|名称1| 22.5|100|false|2022-05-18 13:51:...|2022-05-20| +|名称1| 22.5|100|false|2022-05-20 14:37:...|2022-05-19| ++-----+-----+---+-----+--------------------+----------+ +``` + + diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml index 968b99f030d..5c1264b1850 100644 --- a/seatunnel-core/seatunnel-core-spark/pom.xml +++ b/seatunnel-core/seatunnel-core-spark/pom.xml @@ -107,6 +107,12 @@ ${project.version} + + org.apache.seatunnel + seatunnel-transform-spark-nulltf + ${project.version} + + org.apache.seatunnel seatunnel-transform-spark-null-rate diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml index 7e1740c3be2..8367d8bdc2a 100644 --- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml +++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml @@ -36,6 +36,7 @@ seatunnel-transform-spark-replace seatunnel-transform-spark-uuid seatunnel-transform-spark-sql + seatunnel-transform-spark-nulltf seatunnel-transform-spark-null-rate diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/pom.xml new file mode 100644 index 00000000000..6d4d7412a55 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/pom.xml @@ -0,0 +1,51 @@ + + + + + org.apache.seatunnel + seatunnel-transforms-spark + ${revision} + + 4.0.0 + + seatunnel-transform-spark-nulltf + + + + org.apache.seatunnel + seatunnel-api-spark + ${project.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + + + + diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform new file mode 100644 index 00000000000..5a9467e5f95 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.seatunnel.spark.transform.Nulltf diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/scala/org/apache/seatunnel/spark/transform/Nulltf.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/scala/org/apache/seatunnel/spark/transform/Nulltf.scala new file mode 100644 index 00000000000..aa0cf5b3e94 --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/scala/org/apache/seatunnel/spark/transform/Nulltf.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +import java.sql.Timestamp + +import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists +import org.apache.seatunnel.common.config.CheckResult +import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment} +import org.apache.seatunnel.spark.transform.NulltfConfig._ +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.{BooleanType, DataType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType} +import org.apache.spark.sql.{Dataset, Row} + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap} + +class Nulltf extends BaseSparkTransform { + + + override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = { + + var fieldNameDefault = new mutable.HashMap[String, String]() + if (!config.getConfig(FIELDS).isEmpty) { + config.getConfig(FIELDS).entrySet().foreach(kv => { + fieldNameDefault += (kv.getKey -> kv.getValue.unwrapped().toString) + }) + } + + df.mapPartitions(iter => { + var result = ArrayBuffer[Row]() + while (iter.hasNext) { + val row = iter.next() + val fieldSeq = mutable.Buffer[Any]() + for (i <- 0 until row.size) { + val newField = if (row.isNullAt(i)) { + val fieldName = row.schema.fields.apply(i).name + val fieldType = row.schema.fields.apply(i).dataType + + val temp = fieldNameDefault.get(fieldName) + if (temp.isDefined) { + if (temp.get == null) getDefaultValueByDataType(fieldType) else transfromStringToRightType(temp.get, fieldType) + } else { + getDefaultValueByDataType(fieldType) + } + } else row.get(i) + + fieldSeq += newField + } + val newRow = Row.fromSeq(fieldSeq) + result += newRow + } + result.iterator + })(RowEncoder.apply(df.schema)) + + } + + override def checkConfig(): CheckResult = { + checkAllExists(config) + } + + override def getPluginName: String = PLUGIN_NAME + + private def getDefaultValueByDataType(dataType: DataType): Any = { + dataType match { + case StringType => "" + case ShortType => 0 + case IntegerType => 0 + case FloatType => 0f + case DoubleType => 0d + case LongType => 0L + case BooleanType => false + case DateType => new java.sql.Date(System.currentTimeMillis()) + case TimestampType => new Timestamp(System.currentTimeMillis()) + case _ => null + } + } + + private def transfromStringToRightType(value: String, dataType: DataType): Any = { + dataType match { + case StringType => value + case ShortType => java.lang.Short.valueOf(value) + case IntegerType => java.lang.Integer.valueOf(value) + case FloatType => java.lang.Float.valueOf(value) + case DoubleType => java.lang.Double.valueOf(value) + case LongType => java.lang.Long.valueOf(value) + case BooleanType => java.lang.Boolean.valueOf(value) + case DateType => java.sql.Date.valueOf(value) + case TimestampType => java.sql.Timestamp.valueOf(value) + case _ => value + } + } + + +} diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/scala/org/apache/seatunnel/spark/transform/NulltfConfig.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/scala/org/apache/seatunnel/spark/transform/NulltfConfig.scala new file mode 100644 index 00000000000..89da4fc4f0d --- /dev/null +++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-nulltf/src/main/scala/org/apache/seatunnel/spark/transform/NulltfConfig.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.spark.transform + +object NulltfConfig { + val PLUGIN_NAME = "nulltf" + + val FIELDS = "fields" + +}