Skip to content

Commit

Permalink
fix spark2.1 CI error
Browse files Browse the repository at this point in the history
  • Loading branch information
xubo245 committed Apr 22, 2019
1 parent 98600fb commit b6c15ca
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 88 deletions.
Expand Up @@ -27,6 +27,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.commons.codec.binary.Hex
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil
import org.scalatest.BeforeAndAfterAll

/**
Expand Down Expand Up @@ -340,8 +341,9 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where id =1"), Seq(Row(1)))


sql("insert into binaryTable values(1,true,'Bob','hello',false)")
checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where binaryField ='hello'"), Seq(Row(1)))
checkAnswer(sql("SELECT COUNT(*) FROM binaryTable where binaryField =cast('hello' as binary)"), Seq(Row(1)))
}

test("Test create table with buckets unsafe") {
Expand Down Expand Up @@ -446,7 +448,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
sql("insert into hivetable values(1,true,'Bob','binary',false)")
sql("insert into hivetable values(2,false,'Xu','test',true)")
val hiveResult = sql("SELECT * FROM hivetable where binaryField='binary'")
val hiveResult = sql("SELECT * FROM hivetable where binaryField=cast('binary' as binary)")

sql(
s"""
Expand All @@ -460,7 +462,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
sql("insert into carbontable values(1,true,'Bob','binary',false)")
sql("insert into carbontable values(2,false,'Xu','test',true)")
val carbonResult = sql("SELECT * FROM carbontable where binaryField='binary'")
val carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
checkAnswer(hiveResult, carbonResult)
assert(1 == carbonResult.collect().length)
carbonResult.collect().foreach { each =>
Expand All @@ -480,8 +482,8 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))

// filter with not equal
val hiveResult3 = sql("SELECT * FROM hivetable where binaryField!='binary'")
val carbonResult3 = sql("SELECT * FROM carbontable where binaryField!='binary'")
val hiveResult3 = sql("SELECT * FROM hivetable where binaryField!=cast('binary' as binary)")
val carbonResult3 = sql("SELECT * FROM carbontable where binaryField!=cast('binary' as binary)")
checkAnswer(hiveResult3, carbonResult3)
assert(1 == carbonResult3.collect().length)
carbonResult3.collect().foreach { each =>
Expand Down Expand Up @@ -525,7 +527,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
sql("insert into carbontable values(1,true,'Bob','binary',false)")
sql("insert into carbontable values(2,false,'Xu','test',true)")
var carbonResult = sql("SELECT * FROM carbontable where binaryField='binary'")
var carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
assert(1 == carbonResult.collect().length)
carbonResult.collect().foreach { each =>
if (1 == each.get(0)) {
Expand All @@ -541,7 +543,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
sql("UPDATE carbontable SET (name) = ('David') WHERE id = 1").show()
sql("UPDATE carbontable SET (binaryField) = ('carbon2') WHERE id = 1").show()

carbonResult = sql("SELECT * FROM carbontable where binaryField='binary'")
carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
carbonResult.collect().foreach { each =>
if (1 == each.get(0)) {
assert("carbon2".equals(new String(each.getAs[Array[Byte]](3))))
Expand All @@ -565,7 +567,7 @@ class TestBinaryDataType extends QueryTest with BeforeAndAfterAll {
// Test delete
sql("DELETE FROM carbontable WHERE id = 2").show()

carbonResult = sql("SELECT * FROM carbontable where binaryField='binary'")
carbonResult = sql("SELECT * FROM carbontable where binaryField=cast('binary' as binary)")
carbonResult.collect().foreach { each =>
if (1 == each.get(0)) {
assert("carbon2".equals(new String(each.getAs[Array[Byte]](3))))
Expand Down
Expand Up @@ -76,35 +76,36 @@ class TestNonTransactionalCarbonTableForBinary extends QueryTest with BeforeAndA
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql(s"CREATE EXTERNAL TABLE binaryCarbon STORED BY 'carbondata' LOCATION '$writerPath'")
sql(s"CREATE TABLE binaryCarbon3 STORED BY 'carbondata' LOCATION '$outputPath'" + " AS SELECT * FROM binaryCarbon")
}
checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"),
Seq(Row(3)))
checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"),
Seq(Row(3)))

val result = sql("desc formatted binaryCarbon").collect()
var flag = false
result.foreach { each =>
if ("binary".equals(each.get(1))) {
flag = true

checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon"),
Seq(Row(3)))
checkAnswer(sql("SELECT COUNT(*) FROM binaryCarbon3"),
Seq(Row(3)))

val result = sql("desc formatted binaryCarbon").collect()
var flag = false
result.foreach { each =>
if ("binary".equals(each.get(1))) {
flag = true
}
}
assert(flag)
val value = sql("SELECT * FROM binaryCarbon").collect()
assert(3 == value.length)
value.foreach { each =>
val byteArray = each.getAs[Array[Byte]](2)
assert(new String(byteArray).startsWith("����\u0000\u0010JFIF"))
}
}
assert(flag)
val value = sql("SELECT * FROM binaryCarbon").collect()
assert(3 == value.length)
value.foreach { each =>
val byteArray = each.getAs[Array[Byte]](2)
assert(new String(byteArray).startsWith("����\u0000\u0010JFIF"))
}

val value3 = sql("SELECT * FROM binaryCarbon3").collect()
assert(3 == value3.length)
value3.foreach { each =>
val byteArray = each.getAs[Array[Byte]](2)
assert(new String(byteArray).startsWith("����\u0000\u0010JFIF"))
val value3 = sql("SELECT * FROM binaryCarbon3").collect()
assert(3 == value3.length)
value3.foreach { each =>
val byteArray = each.getAs[Array[Byte]](2)
assert(new String(byteArray).startsWith("����\u0000\u0010JFIF"))
}
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon3")
}
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon3")
}

test("Don't support insert into partition table") {
Expand All @@ -131,30 +132,31 @@ class TestNonTransactionalCarbonTableForBinary extends QueryTest with BeforeAndA
| labelName STRING,
| labelContent STRING
|) partitioned by ( binary BINARY) STORED BY 'carbondata'""".stripMargin)

sql("insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult2 = sql("SELECT * FROM binaryCarbon2")

sql("create table binaryCarbon4 STORED BY 'carbondata' select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult4 = sql("SELECT * FROM binaryCarbon4")
val carbonResult = sql("SELECT * FROM binaryCarbon")

assert(3 == carbonResult.collect().length)
assert(1 == carbonResult4.collect().length)
assert(1 == carbonResult2.collect().length)
checkAnswer(carbonResult4, carbonResult2)

try {
sql("insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
assert(false)
} catch {
case e: Exception =>
e.printStackTrace()
assert(true)
}
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon2")
sql("DROP TABLE IF EXISTS binaryCarbon3")
sql("DROP TABLE IF EXISTS binaryCarbon4")
}
sql("insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult2 = sql("SELECT * FROM binaryCarbon2")

sql("create table binaryCarbon4 STORED BY 'carbondata' select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult4 = sql("SELECT * FROM binaryCarbon4")
val carbonResult = sql("SELECT * FROM binaryCarbon")

assert(3 == carbonResult.collect().length)
assert(1 == carbonResult4.collect().length)
assert(1 == carbonResult2.collect().length)
checkAnswer(carbonResult4, carbonResult2)

try {
sql("insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
assert(false)
} catch {
case e: Exception =>
e.printStackTrace()
assert(true)
}
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon2")
sql("DROP TABLE IF EXISTS binaryCarbon3")
sql("DROP TABLE IF EXISTS binaryCarbon4")
}
}
Expand Up @@ -46,6 +46,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {

def buildTestBinaryData(): Any = {
FileUtils.deleteDirectory(new File(writerPath))
FileUtils.deleteDirectory(new File(outputPath))

val sourceImageFolder = sdkPath + "/src/test/resources/image/flowers"
val sufAnnotation = ".txt"
Expand Down Expand Up @@ -84,6 +85,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
test("Test read image carbon with spark carbon file format, generate by sdk, CTAS") {
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon3")
FileUtils.deleteDirectory(new File(outputPath))
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
sql(s"CREATE TABLE binaryCarbon USING CARBON OPTIONS(PATH '$writerPath')")
sql(s"CREATE TABLE binaryCarbon3 USING CARBON OPTIONS(PATH '$outputPath')" + " AS SELECT * FROM binaryCarbon")
Expand All @@ -97,6 +99,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
Seq(Row(3)))
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon3")
FileUtils.deleteDirectory(new File(outputPath))
}

test("Don't support sort_columns") {
Expand Down Expand Up @@ -140,11 +143,11 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
}

test("Don't support insert into partition table") {
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon2")
sql("DROP TABLE IF EXISTS binaryCarbon3")
sql("DROP TABLE IF EXISTS binaryCarbon4")
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon2")
sql("DROP TABLE IF EXISTS binaryCarbon3")
sql("DROP TABLE IF EXISTS binaryCarbon4")
sql(s"CREATE TABLE binaryCarbon USING CARBON LOCATION '$writerPath'")
sql(
s"""
Expand All @@ -165,36 +168,38 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
| labelContent STRING
|) USING CARBON partitioned by (binary) """.stripMargin)
sql("select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0").show()
}
sql("insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult2 = sql("SELECT * FROM binaryCarbon2")

sql("create table binaryCarbon4 using carbon select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult4 = sql("SELECT * FROM binaryCarbon4")
val carbonResult = sql("SELECT * FROM binaryCarbon")
sql("insert into binaryCarbon2 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult2 = sql("SELECT * FROM binaryCarbon2")

assert(3 == carbonResult.collect().length)
assert(1 == carbonResult4.collect().length)
assert(1 == carbonResult2.collect().length)
checkAnswer(carbonResult4, carbonResult2)

try {
sql("insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
assert(false)
} catch {
case e: Exception =>
e.printStackTrace()
assert(true)
sql("create table binaryCarbon4 using carbon select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
val carbonResult4 = sql("SELECT * FROM binaryCarbon4")
val carbonResult = sql("SELECT * FROM binaryCarbon")

assert(3 == carbonResult.collect().length)
assert(1 == carbonResult4.collect().length)
assert(1 == carbonResult2.collect().length)
checkAnswer(carbonResult4, carbonResult2)

try {
sql("insert into binaryCarbon3 select binaryId,binaryName,binary,labelName,labelContent from binaryCarbon where binaryId=0 ")
assert(false)
} catch {
case e: Exception =>
e.printStackTrace()
assert(true)
}
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon2")
sql("DROP TABLE IF EXISTS binaryCarbon3")
sql("DROP TABLE IF EXISTS binaryCarbon4")
}
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon2")
sql("DROP TABLE IF EXISTS binaryCarbon3")
sql("DROP TABLE IF EXISTS binaryCarbon4")
}

test("Test unsafe as false") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
FileUtils.deleteDirectory(new File(outputPath))
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon3")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
Expand All @@ -211,6 +216,7 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS binaryCarbon")
sql("DROP TABLE IF EXISTS binaryCarbon3")

FileUtils.deleteDirectory(new File(outputPath))
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
Expand Down Expand Up @@ -448,8 +454,8 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
sql("insert into carbontable values(2,false,'Xu','test',true)")

// filter with equal
val hiveResult = sql("SELECT * FROM hivetable where image='binary'")
val carbonResult = sql("SELECT * FROM carbontable where image='binary'")
val hiveResult = sql("SELECT * FROM hivetable where image=cast('binary' as binary)")
val carbonResult = sql("SELECT * FROM carbontable where image=cast('binary' as binary)")

checkAnswer(hiveResult, carbonResult)
assert(1 == carbonResult.collect().length)
Expand All @@ -465,8 +471,8 @@ class SparkCarbonDataSourceBinaryTest extends FunSuite with BeforeAndAfterAll {
assert(exception.getMessage.contains("cannot resolve '`binary`' given input columns"))

// filter with not equal
val hiveResult3 = sql("SELECT * FROM hivetable where image!='binary'")
val carbonResult3 = sql("SELECT * FROM carbontable where image!='binary'")
val hiveResult3 = sql("SELECT * FROM hivetable where image!=cast('binary' as binary)")
val carbonResult3 = sql("SELECT * FROM carbontable where image!=cast('binary' as binary)")
checkAnswer(hiveResult3, carbonResult3)
assert(1 == carbonResult3.collect().length)
carbonResult3.collect().foreach { each =>
Expand Down

0 comments on commit b6c15ca

Please sign in to comment.