From bfe6cf22b1b85b3faf1ed0fda93bdda8edafc5ca Mon Sep 17 00:00:00 2001 From: xubo245 Date: Fri, 31 May 2019 20:33:25 +0800 Subject: [PATCH] [CARBONDATA-3410] Add UDF, Hex/Base64 SQL functions for binary --- .../testsuite/binary/TestBinaryDataType.scala | 32 ++++ .../SparkCarbonDataSourceBinaryTest.scala | 140 +++++++++++------- 2 files changed, 117 insertions(+), 55 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala index 89c89dc52b0..5439d81f763 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/binary/TestBinaryDataType.scala @@ -63,6 +63,17 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { } assert(flag) + sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray)) + sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes())) + + val udfHexResult = sql("SELECT decodeHex(binaryField) FROM binaryTable") + val unhexResult = sql("SELECT unhex(binaryField) FROM binaryTable") + checkAnswer(udfHexResult, unhexResult) + + val udfBase64Result = sql("SELECT decodeBase64(binaryField) FROM binaryTable") + val unbase64Result = sql("SELECT unbase64(binaryField) FROM binaryTable") + checkAnswer(udfBase64Result, unbase64Result) + checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3))) try { val df = sql("SELECT * FROM binaryTable").collect() @@ -612,6 +623,27 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll { | OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail') """.stripMargin) + val hexHiveResult = sql("SELECT hex(binaryField) FROM hivetable") + val hexCarbonResult = sql("SELECT hex(binaryField) FROM carbontable") + checkAnswer(hexHiveResult, hexCarbonResult) + hexCarbonResult.collect().foreach { each => + val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray)) + assert("\u0001history\u0002".equals(result) + || "\u0001biology\u0002".equals(result) + || "\u0001education\u0002".equals(result)) + } + + val base64HiveResult = sql("SELECT base64(binaryField) FROM hivetable") + val base64CarbonResult = sql("SELECT base64(binaryField) FROM carbontable") + checkAnswer(base64HiveResult, base64CarbonResult) + base64CarbonResult.collect().foreach { each => + val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString)) + assert("\u0001history\u0002".equals(result) + || "\u0001biology\u0002".equals(result) + || "\u0001education\u0002".equals(result)) + } + + val hiveResult = sql("SELECT * FROM hivetable") val carbonResult = sql("SELECT * FROM carbontable") checkAnswer(hiveResult, carbonResult) diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala index bdfc9dd5ce4..d234576ed92 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceBinaryTest.scala @@ -17,16 +17,14 @@ package org.apache.spark.sql.carbondata.datasource import java.io.File - import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.sdk.util.BinaryUtil +import org.apache.commons.codec.binary.{Base64, Hex} import org.apache.commons.io.FileUtils - import org.apache.spark.sql.Row import org.apache.spark.sql.carbondata.datasource.TestUtil._ import org.apache.spark.util.SparkUtil - import org.scalatest.{BeforeAndAfterAll, FunSuite} class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { @@ -257,9 +255,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { test("insert into for hive and carbon, CTAS") { sql("DROP TABLE IF EXISTS hiveTable") - sql("DROP TABLE IF EXISTS carbontable") + sql("DROP TABLE IF EXISTS carbon_table") sql("DROP TABLE IF EXISTS hiveTable2") - sql("DROP TABLE IF EXISTS carbontable2") + sql("DROP TABLE IF EXISTS carbon_table2") sql( s""" | CREATE TABLE IF NOT EXISTS hivetable ( @@ -275,7 +273,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { sql( s""" - | CREATE TABLE IF NOT EXISTS carbontable ( + | CREATE TABLE IF NOT EXISTS carbon_table ( | id int, | label boolean, | name string, @@ -283,9 +281,28 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { | autoLabel boolean) | using carbon """.stripMargin) - sql("insert into carbontable values(1,true,'Bob','binary',false)") - sql("insert into carbontable values(2,false,'Xu','test',true)") - val carbonResult = sql("SELECT * FROM carbontable") + sql("insert into carbon_table values(1,true,'Bob','binary',false)") + sql("insert into carbon_table values(2,false,'Xu','test',true)") + + val hexHiveResult = sql("SELECT hex(image) FROM hivetable") + val hexCarbonResult = sql("SELECT hex(image) FROM carbon_table") + checkAnswer(hexHiveResult, hexCarbonResult) + hexCarbonResult.collect().foreach { each => + val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray)) + assert("binary".equals(result) + || "test".equals(result)) + } + + val base64HiveResult = sql("SELECT base64(image) FROM hivetable") + val base64CarbonResult = sql("SELECT base64(image) FROM carbon_table") + checkAnswer(base64HiveResult, base64CarbonResult) + base64CarbonResult.collect().foreach { each => + val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString)) + assert("binary".equals(result) + || "test".equals(result)) + } + + val carbonResult = sql("SELECT * FROM carbon_table") val hiveResult = sql("SELECT * FROM hivetable") assert(2 == carbonResult.collect().length) @@ -301,9 +318,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { } } - sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable") - sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM hivetable") - val carbonResult2 = sql("SELECT * FROM carbontable2") + sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table") + sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable") + val carbonResult2 = sql("SELECT * FROM carbon_table2") val hiveResult2 = sql("SELECT * FROM hivetable2") checkAnswer(hiveResult2, carbonResult2) checkAnswer(carbonResult, carbonResult2) @@ -311,9 +328,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { assert(2 == carbonResult2.collect().length) assert(2 == hiveResult2.collect().length) - sql("INSERT INTO hivetable2 SELECT * FROM carbontable") - sql("INSERT INTO carbontable2 SELECT * FROM hivetable") - val carbonResult3 = sql("SELECT * FROM carbontable2") + sql("INSERT INTO hivetable2 SELECT * FROM carbon_table") + sql("INSERT INTO carbon_table2 SELECT * FROM hivetable") + val carbonResult3 = sql("SELECT * FROM carbon_table2") val hiveResult3 = sql("SELECT * FROM hivetable2") checkAnswer(carbonResult3, hiveResult3) assert(4 == carbonResult3.collect().length) @@ -322,9 +339,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { test("insert into for parquet and carbon, CTAS") { sql("DROP TABLE IF EXISTS parquetTable") - sql("DROP TABLE IF EXISTS carbontable") + sql("DROP TABLE IF EXISTS carbon_table") sql("DROP TABLE IF EXISTS parquetTable2") - sql("DROP TABLE IF EXISTS carbontable2") + sql("DROP TABLE IF EXISTS carbon_table2") sql( s""" | CREATE TABLE IF NOT EXISTS parquettable ( @@ -340,7 +357,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { sql( s""" - | CREATE TABLE IF NOT EXISTS carbontable ( + | CREATE TABLE IF NOT EXISTS carbon_table ( | id int, | label boolean, | name string, @@ -348,9 +365,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { | autoLabel boolean) | using carbon """.stripMargin) - sql("insert into carbontable values(1,true,'Bob','binary',false)") - sql("insert into carbontable values(2,false,'Xu','test',true)") - val carbonResult = sql("SELECT * FROM carbontable") + sql("insert into carbon_table values(1,true,'Bob','binary',false)") + sql("insert into carbon_table values(2,false,'Xu','test',true)") + val carbonResult = sql("SELECT * FROM carbon_table") val parquetResult = sql("SELECT * FROM parquettable") assert(2 == carbonResult.collect().length) @@ -366,9 +383,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { } } - sql("CREATE TABLE parquettable2 AS SELECT * FROM carbontable") - sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM parquettable") - val carbonResult2 = sql("SELECT * FROM carbontable2") + sql("CREATE TABLE parquettable2 AS SELECT * FROM carbon_table") + sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM parquettable") + val carbonResult2 = sql("SELECT * FROM carbon_table2") val parquetResult2 = sql("SELECT * FROM parquettable2") checkAnswer(parquetResult2, carbonResult2) checkAnswer(carbonResult, carbonResult2) @@ -376,9 +393,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { assert(2 == carbonResult2.collect().length) assert(2 == parquetResult2.collect().length) - sql("INSERT INTO parquettable2 SELECT * FROM carbontable") - sql("INSERT INTO carbontable2 SELECT * FROM parquettable") - val carbonResult3 = sql("SELECT * FROM carbontable2") + sql("INSERT INTO parquettable2 SELECT * FROM carbon_table") + sql("INSERT INTO carbon_table2 SELECT * FROM parquettable") + val carbonResult3 = sql("SELECT * FROM carbon_table2") val parquetResult3 = sql("SELECT * FROM parquettable2") checkAnswer(carbonResult3, parquetResult3) assert(4 == carbonResult3.collect().length) @@ -387,9 +404,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { test("insert into carbon as select from hive after hive load data") { sql("DROP TABLE IF EXISTS hiveTable") - sql("DROP TABLE IF EXISTS carbontable") + sql("DROP TABLE IF EXISTS carbon_table") sql("DROP TABLE IF EXISTS hiveTable2") - sql("DROP TABLE IF EXISTS carbontable2") + sql("DROP TABLE IF EXISTS carbon_table2") sql( s""" @@ -409,7 +426,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { sql( s""" - | CREATE TABLE IF NOT EXISTS carbontable ( + | CREATE TABLE IF NOT EXISTS carbon_table ( | id int, | label boolean, | name string, @@ -417,8 +434,20 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { | autoLabel boolean) | using carbon """.stripMargin) - sql("insert into carbontable select * from hivetable") - val carbonResult = sql("SELECT * FROM carbontable") + sql("insert into carbon_table select * from hivetable") + + sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray)) + sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes())) + + val udfHexResult = sql("SELECT decodeHex(image) FROM carbon_table") + val unhexResult = sql("SELECT unhex(image) FROM carbon_table") + checkAnswer(udfHexResult, unhexResult) + + val udfBase64Result = sql("SELECT decodeBase64(image) FROM carbon_table") + val unbase64Result = sql("SELECT unbase64(image) FROM carbon_table") + checkAnswer(udfBase64Result, unbase64Result) + + val carbonResult = sql("SELECT * FROM carbon_table") val hiveResult = sql("SELECT * FROM hivetable") assert(3 == carbonResult.collect().length) @@ -437,9 +466,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { } } - sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable") - sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM hivetable") - val carbonResult2 = sql("SELECT * FROM carbontable2") + sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table") + sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable") + val carbonResult2 = sql("SELECT * FROM carbon_table2") val hiveResult2 = sql("SELECT * FROM hivetable2") checkAnswer(hiveResult2, carbonResult2) checkAnswer(carbonResult, carbonResult2) @@ -447,18 +476,19 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { assert(3 == carbonResult2.collect().length) assert(3 == hiveResult2.collect().length) - sql("INSERT INTO hivetable2 SELECT * FROM carbontable") - sql("INSERT INTO carbontable2 SELECT * FROM hivetable") - val carbonResult3 = sql("SELECT * FROM carbontable2") + sql("INSERT INTO hivetable2 SELECT * FROM carbon_table") + sql("INSERT INTO carbon_table2 SELECT * FROM hivetable") + val carbonResult3 = sql("SELECT * FROM carbon_table2") val hiveResult3 = sql("SELECT * FROM hivetable2") checkAnswer(carbonResult3, hiveResult3) assert(6 == carbonResult3.collect().length) assert(6 == hiveResult3.collect().length) + } test("filter for hive and carbon") { sql("DROP TABLE IF EXISTS hiveTable") - sql("DROP TABLE IF EXISTS carbontable") + sql("DROP TABLE IF EXISTS carbon_table") sql( s""" @@ -475,7 +505,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { sql( s""" - | CREATE TABLE IF NOT EXISTS carbontable ( + | CREATE TABLE IF NOT EXISTS carbon_table ( | id int, | label boolean, | name string, @@ -483,12 +513,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { | autoLabel boolean) | using carbon """.stripMargin) - sql("insert into carbontable values(1,true,'Bob','binary',false)") - sql("insert into carbontable values(2,false,'Xu','test',true)") + sql("insert into carbon_table values(1,true,'Bob','binary',false)") + sql("insert into carbon_table values(2,false,'Xu','test',true)") // filter with equal val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)") - val carbonResult = sql("SELECT * FROM carbontable where image=cast('binary' as binary)") + val carbonResult = sql("SELECT * FROM carbon_table where image=cast('binary' as binary)") checkAnswer(hiveResult, carbonResult) assert(1 == carbonResult.collect().length) @@ -499,13 +529,13 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { // filter with non string val exception = intercept[Exception] { - sql("SELECT * FROM carbontable where image=binary").collect() + sql("SELECT * FROM carbon_table where image=binary").collect() } assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns")) // filter with not equal val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)") - val carbonResult3 = sql("SELECT * FROM carbontable where image!=cast('binary' as binary)") + val carbonResult3 = sql("SELECT * FROM carbon_table where image!=cast('binary' as binary)") checkAnswer(hiveResult3, carbonResult3) assert(1 == carbonResult3.collect().length) carbonResult3.collect().foreach { each => @@ -515,7 +545,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { // filter with in val hiveResult4 = sql("SELECT * FROM hivetable where image in (cast('binary' as binary))") - val carbonResult4 = sql("SELECT * FROM carbontable where image in (cast('binary' as binary))") + val carbonResult4 = sql("SELECT * FROM carbon_table where image in (cast('binary' as binary))") checkAnswer(hiveResult4, carbonResult4) assert(1 == carbonResult4.collect().length) carbonResult4.collect().foreach { each => @@ -525,7 +555,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { // filter with not in val hiveResult5 = sql("SELECT * FROM hivetable where image not in (cast('binary' as binary))") - val carbonResult5 = sql("SELECT * FROM carbontable where image not in (cast('binary' as binary))") + val carbonResult5 = sql("SELECT * FROM carbon_table where image not in (cast('binary' as binary))") checkAnswer(hiveResult5, carbonResult5) assert(1 == carbonResult5.collect().length) carbonResult5.collect().foreach { each => @@ -535,12 +565,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { } test("Spark DataSource don't support update, delete") { - sql("DROP TABLE IF EXISTS carbontable") - sql("DROP TABLE IF EXISTS carbontable2") + sql("DROP TABLE IF EXISTS carbon_table") + sql("DROP TABLE IF EXISTS carbon_table2") sql( s""" - | CREATE TABLE IF NOT EXISTS carbontable ( + | CREATE TABLE IF NOT EXISTS carbon_table ( | id int, | label boolean, | name string, @@ -548,10 +578,10 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { | autoLabel boolean) | using carbon """.stripMargin) - sql("insert into carbontable values(1,true,'Bob','binary',false)") - sql("insert into carbontable values(2,false,'Xu','test',true)") + sql("insert into carbon_table values(1,true,'Bob','binary',false)") + sql("insert into carbon_table values(2,false,'Xu','test',true)") - val carbonResult = sql("SELECT * FROM carbontable") + val carbonResult = sql("SELECT * FROM carbon_table") carbonResult.collect().foreach { each => if (1 == each.get(0)) { @@ -564,12 +594,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll { } var exception = intercept[Exception] { - sql("UPDATE carbontable SET binaryField = 'binary2' WHERE id = 1").show() + sql("UPDATE carbon_table SET binaryField = 'binary2' WHERE id = 1").show() } assert(exception.getMessage.contains("mismatched input 'UPDATE' expecting")) exception = intercept[Exception] { - sql("DELETE FROM carbontable WHERE id = 1").show() + sql("DELETE FROM carbon_table WHERE id = 1").show() } assert(exception.getMessage.contains("Operation not allowed: DELETE FROM")) }