Skip to content

Commit

Permalink
[CARBONDATA-3410] Add UDF, Hex/Base64 SQL functions for binary
Browse files Browse the repository at this point in the history
Add UDF, Hex/Base64 SQL functions for binary

This closes # 3253
  • Loading branch information
xubo245 authored and kumarvishal09 committed Jun 12, 2019
1 parent d2bc0a9 commit c497142
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
}
assert(flag)

sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray))
sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes()))

val udfHexResult = sql("SELECT decodeHex(binaryField) FROM binaryTable")
val unhexResult = sql("SELECT unhex(binaryField) FROM binaryTable")
checkAnswer(udfHexResult, unhexResult)

val udfBase64Result = sql("SELECT decodeBase64(binaryField) FROM binaryTable")
val unbase64Result = sql("SELECT unbase64(binaryField) FROM binaryTable")
checkAnswer(udfBase64Result, unbase64Result)

checkAnswer(sql("SELECT COUNT(*) FROM binaryTable"), Seq(Row(3)))
try {
val df = sql("SELECT * FROM binaryTable").collect()
Expand Down Expand Up @@ -614,6 +625,27 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
| OPTIONS('header'='false','DELIMITER'='|','bad_records_action'='fail')
""".stripMargin)

val hexHiveResult = sql("SELECT hex(binaryField) FROM hivetable")
val hexCarbonResult = sql("SELECT hex(binaryField) FROM carbontable")
checkAnswer(hexHiveResult, hexCarbonResult)
hexCarbonResult.collect().foreach { each =>
val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray))
assert("\u0001history\u0002".equals(result)
|| "\u0001biology\u0002".equals(result)
|| "\u0001education\u0002".equals(result))
}

val base64HiveResult = sql("SELECT base64(binaryField) FROM hivetable")
val base64CarbonResult = sql("SELECT base64(binaryField) FROM carbontable")
checkAnswer(base64HiveResult, base64CarbonResult)
base64CarbonResult.collect().foreach { each =>
val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString))
assert("\u0001history\u0002".equals(result)
|| "\u0001biology\u0002".equals(result)
|| "\u0001education\u0002".equals(result))
}


val hiveResult = sql("SELECT * FROM hivetable")
val carbonResult = sql("SELECT * FROM carbontable")
checkAnswer(hiveResult, carbonResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
package org.apache.spark.sql.carbondata.datasource

import java.io.File

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.util.BinaryUtil
import org.apache.commons.codec.binary.{Base64, Hex}
import org.apache.commons.io.FileUtils

import org.apache.spark.sql.Row
import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.util.SparkUtil

import org.scalatest.{BeforeAndAfterAll, FunSuite}

class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
Expand Down Expand Up @@ -257,9 +255,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

test("insert into for hive and carbon, CTAS") {
sql("DROP TABLE IF EXISTS hiveTable")
sql("DROP TABLE IF EXISTS carbontable")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS hiveTable2")
sql("DROP TABLE IF EXISTS carbontable2")
sql("DROP TABLE IF EXISTS carbon_table2")
sql(
s"""
| CREATE TABLE IF NOT EXISTS hivetable (
Expand All @@ -275,17 +273,36 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

sql(
s"""
| CREATE TABLE IF NOT EXISTS carbontable (
| CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
| image binary,
| autoLabel boolean)
| using carbon
""".stripMargin)
sql("insert into carbontable values(1,true,'Bob','binary',false)")
sql("insert into carbontable values(2,false,'Xu','test',true)")
val carbonResult = sql("SELECT * FROM carbontable")
sql("insert into carbon_table values(1,true,'Bob','binary',false)")
sql("insert into carbon_table values(2,false,'Xu','test',true)")

val hexHiveResult = sql("SELECT hex(image) FROM hivetable")
val hexCarbonResult = sql("SELECT hex(image) FROM carbon_table")
checkAnswer(hexHiveResult, hexCarbonResult)
hexCarbonResult.collect().foreach { each =>
val result = new String(Hex.decodeHex((each.getAs[Array[Char]](0)).toString.toCharArray))
assert("binary".equals(result)
|| "test".equals(result))
}

val base64HiveResult = sql("SELECT base64(image) FROM hivetable")
val base64CarbonResult = sql("SELECT base64(image) FROM carbon_table")
checkAnswer(base64HiveResult, base64CarbonResult)
base64CarbonResult.collect().foreach { each =>
val result = new String(Base64.decodeBase64((each.getAs[Array[Char]](0)).toString))
assert("binary".equals(result)
|| "test".equals(result))
}

val carbonResult = sql("SELECT * FROM carbon_table")
val hiveResult = sql("SELECT * FROM hivetable")

assert(2 == carbonResult.collect().length)
Expand All @@ -301,19 +318,19 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
}

sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM hivetable")
val carbonResult2 = sql("SELECT * FROM carbontable2")
sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable")
val carbonResult2 = sql("SELECT * FROM carbon_table2")
val hiveResult2 = sql("SELECT * FROM hivetable2")
checkAnswer(hiveResult2, carbonResult2)
checkAnswer(carbonResult, carbonResult2)
checkAnswer(hiveResult, hiveResult2)
assert(2 == carbonResult2.collect().length)
assert(2 == hiveResult2.collect().length)

sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
val carbonResult3 = sql("SELECT * FROM carbontable2")
sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
val carbonResult3 = sql("SELECT * FROM carbon_table2")
val hiveResult3 = sql("SELECT * FROM hivetable2")
checkAnswer(carbonResult3, hiveResult3)
assert(4 == carbonResult3.collect().length)
Expand All @@ -322,9 +339,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

test("insert into for parquet and carbon, CTAS") {
sql("DROP TABLE IF EXISTS parquetTable")
sql("DROP TABLE IF EXISTS carbontable")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS parquetTable2")
sql("DROP TABLE IF EXISTS carbontable2")
sql("DROP TABLE IF EXISTS carbon_table2")
sql(
s"""
| CREATE TABLE IF NOT EXISTS parquettable (
Expand All @@ -340,17 +357,17 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

sql(
s"""
| CREATE TABLE IF NOT EXISTS carbontable (
| CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
| image binary,
| autoLabel boolean)
| using carbon
""".stripMargin)
sql("insert into carbontable values(1,true,'Bob','binary',false)")
sql("insert into carbontable values(2,false,'Xu','test',true)")
val carbonResult = sql("SELECT * FROM carbontable")
sql("insert into carbon_table values(1,true,'Bob','binary',false)")
sql("insert into carbon_table values(2,false,'Xu','test',true)")
val carbonResult = sql("SELECT * FROM carbon_table")
val parquetResult = sql("SELECT * FROM parquettable")

assert(2 == carbonResult.collect().length)
Expand All @@ -366,19 +383,19 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
}

sql("CREATE TABLE parquettable2 AS SELECT * FROM carbontable")
sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM parquettable")
val carbonResult2 = sql("SELECT * FROM carbontable2")
sql("CREATE TABLE parquettable2 AS SELECT * FROM carbon_table")
sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM parquettable")
val carbonResult2 = sql("SELECT * FROM carbon_table2")
val parquetResult2 = sql("SELECT * FROM parquettable2")
checkAnswer(parquetResult2, carbonResult2)
checkAnswer(carbonResult, carbonResult2)
checkAnswer(parquetResult, parquetResult2)
assert(2 == carbonResult2.collect().length)
assert(2 == parquetResult2.collect().length)

sql("INSERT INTO parquettable2 SELECT * FROM carbontable")
sql("INSERT INTO carbontable2 SELECT * FROM parquettable")
val carbonResult3 = sql("SELECT * FROM carbontable2")
sql("INSERT INTO parquettable2 SELECT * FROM carbon_table")
sql("INSERT INTO carbon_table2 SELECT * FROM parquettable")
val carbonResult3 = sql("SELECT * FROM carbon_table2")
val parquetResult3 = sql("SELECT * FROM parquettable2")
checkAnswer(carbonResult3, parquetResult3)
assert(4 == carbonResult3.collect().length)
Expand All @@ -387,9 +404,9 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

test("insert into carbon as select from hive after hive load data") {
sql("DROP TABLE IF EXISTS hiveTable")
sql("DROP TABLE IF EXISTS carbontable")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS hiveTable2")
sql("DROP TABLE IF EXISTS carbontable2")
sql("DROP TABLE IF EXISTS carbon_table2")

sql(
s"""
Expand All @@ -409,16 +426,28 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

sql(
s"""
| CREATE TABLE IF NOT EXISTS carbontable (
| CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
| image binary,
| autoLabel boolean)
| using carbon
""".stripMargin)
sql("insert into carbontable select * from hivetable")
val carbonResult = sql("SELECT * FROM carbontable")
sql("insert into carbon_table select * from hivetable")

sqlContext.udf.register("decodeHex", (str: String) => Hex.decodeHex(str.toCharArray))
sqlContext.udf.register("decodeBase64", (str: String) => Base64.decodeBase64(str.getBytes()))

val udfHexResult = sql("SELECT decodeHex(image) FROM carbon_table")
val unhexResult = sql("SELECT unhex(image) FROM carbon_table")
checkAnswer(udfHexResult, unhexResult)

val udfBase64Result = sql("SELECT decodeBase64(image) FROM carbon_table")
val unbase64Result = sql("SELECT unbase64(image) FROM carbon_table")
checkAnswer(udfBase64Result, unbase64Result)

val carbonResult = sql("SELECT * FROM carbon_table")
val hiveResult = sql("SELECT * FROM hivetable")

assert(3 == carbonResult.collect().length)
Expand All @@ -437,28 +466,29 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}
}

sql("CREATE TABLE hivetable2 AS SELECT * FROM carbontable")
sql("CREATE TABLE carbontable2 USING CARBON AS SELECT * FROM hivetable")
val carbonResult2 = sql("SELECT * FROM carbontable2")
sql("CREATE TABLE hivetable2 AS SELECT * FROM carbon_table")
sql("CREATE TABLE carbon_table2 USING CARBON AS SELECT * FROM hivetable")
val carbonResult2 = sql("SELECT * FROM carbon_table2")
val hiveResult2 = sql("SELECT * FROM hivetable2")
checkAnswer(hiveResult2, carbonResult2)
checkAnswer(carbonResult, carbonResult2)
checkAnswer(hiveResult, hiveResult2)
assert(3 == carbonResult2.collect().length)
assert(3 == hiveResult2.collect().length)

sql("INSERT INTO hivetable2 SELECT * FROM carbontable")
sql("INSERT INTO carbontable2 SELECT * FROM hivetable")
val carbonResult3 = sql("SELECT * FROM carbontable2")
sql("INSERT INTO hivetable2 SELECT * FROM carbon_table")
sql("INSERT INTO carbon_table2 SELECT * FROM hivetable")
val carbonResult3 = sql("SELECT * FROM carbon_table2")
val hiveResult3 = sql("SELECT * FROM hivetable2")
checkAnswer(carbonResult3, hiveResult3)
assert(6 == carbonResult3.collect().length)
assert(6 == hiveResult3.collect().length)

}

test("filter for hive and carbon") {
sql("DROP TABLE IF EXISTS hiveTable")
sql("DROP TABLE IF EXISTS carbontable")
sql("DROP TABLE IF EXISTS carbon_table")

sql(
s"""
Expand All @@ -475,20 +505,20 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

sql(
s"""
| CREATE TABLE IF NOT EXISTS carbontable (
| CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
| image binary,
| autoLabel boolean)
| using carbon
""".stripMargin)
sql("insert into carbontable values(1,true,'Bob','binary',false)")
sql("insert into carbontable values(2,false,'Xu','test',true)")
sql("insert into carbon_table values(1,true,'Bob','binary',false)")
sql("insert into carbon_table values(2,false,'Xu','test',true)")

// filter with equal
val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)")
val carbonResult = sql("SELECT * FROM carbontable where image=cast('binary' as binary)")
val carbonResult = sql("SELECT * FROM carbon_table where image=cast('binary' as binary)")

checkAnswer(hiveResult, carbonResult)
assert(1 == carbonResult.collect().length)
Expand All @@ -499,13 +529,13 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

// filter with non string
val exception = intercept[Exception] {
sql("SELECT * FROM carbontable where image=binary").collect()
sql("SELECT * FROM carbon_table where image=binary").collect()
}
assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))

// filter with not equal
val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)")
val carbonResult3 = sql("SELECT * FROM carbontable where image!=cast('binary' as binary)")
val carbonResult3 = sql("SELECT * FROM carbon_table where image!=cast('binary' as binary)")
checkAnswer(hiveResult3, carbonResult3)
assert(1 == carbonResult3.collect().length)
carbonResult3.collect().foreach { each =>
Expand All @@ -515,7 +545,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

// filter with in
val hiveResult4 = sql("SELECT * FROM hivetable where image in (cast('binary' as binary))")
val carbonResult4 = sql("SELECT * FROM carbontable where image in (cast('binary' as binary))")
val carbonResult4 = sql("SELECT * FROM carbon_table where image in (cast('binary' as binary))")
checkAnswer(hiveResult4, carbonResult4)
assert(1 == carbonResult4.collect().length)
carbonResult4.collect().foreach { each =>
Expand All @@ -525,7 +555,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

// filter with not in
val hiveResult5 = sql("SELECT * FROM hivetable where image not in (cast('binary' as binary))")
val carbonResult5 = sql("SELECT * FROM carbontable where image not in (cast('binary' as binary))")
val carbonResult5 = sql("SELECT * FROM carbon_table where image not in (cast('binary' as binary))")
checkAnswer(hiveResult5, carbonResult5)
assert(1 == carbonResult5.collect().length)
carbonResult5.collect().foreach { each =>
Expand All @@ -535,23 +565,23 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}

test("Spark DataSource don't support update, delete") {
sql("DROP TABLE IF EXISTS carbontable")
sql("DROP TABLE IF EXISTS carbontable2")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS carbon_table2")

sql(
s"""
| CREATE TABLE IF NOT EXISTS carbontable (
| CREATE TABLE IF NOT EXISTS carbon_table (
| id int,
| label boolean,
| name string,
| binaryField binary,
| autoLabel boolean)
| using carbon
""".stripMargin)
sql("insert into carbontable values(1,true,'Bob','binary',false)")
sql("insert into carbontable values(2,false,'Xu','test',true)")
sql("insert into carbon_table values(1,true,'Bob','binary',false)")
sql("insert into carbon_table values(2,false,'Xu','test',true)")

val carbonResult = sql("SELECT * FROM carbontable")
val carbonResult = sql("SELECT * FROM carbon_table")

carbonResult.collect().foreach { each =>
if (1 == each.get(0)) {
Expand All @@ -564,12 +594,12 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}

var exception = intercept[Exception] {
sql("UPDATE carbontable SET binaryField = 'binary2' WHERE id = 1").show()
sql("UPDATE carbon_table SET binaryField = 'binary2' WHERE id = 1").show()
}
assert(exception.getMessage.contains("mismatched input 'UPDATE' expecting"))

exception = intercept[Exception] {
sql("DELETE FROM carbontable WHERE id = 1").show()
sql("DELETE FROM carbon_table WHERE id = 1").show()
}
assert(exception.getMessage.contains("Operation not allowed: DELETE FROM"))
}
Expand Down

0 comments on commit c497142

Please sign in to comment.