Skip to content

Commit

Permalink
[CARBONDATA-3230] Add alter test case for datasource
Browse files Browse the repository at this point in the history
[CARBONDATA-3230] Add ALTER test case with datasource for using parquet and carbon

1.add column. => carbon and parquet don't support, limit from Spark
2.drop column => carbon doesn't support in sql, limit from Spark, but using DF is ok; parquet use DF is ok, but sql doesn't support iy.
3.rename column =》 carbon and parquet support it.
4.change datatype of column => carbon parquet doesn't support, limit from Spark, spark only support change comment

This closes #3024
  • Loading branch information
xubo245 authored and ravipesala committed Jan 8, 2019
1 parent c0ba982 commit 4e27b86
Showing 1 changed file with 327 additions and 0 deletions.
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField => SparkStructField, StructType}
import org.apache.spark.util.SparkUtil
import org.scalatest.{BeforeAndAfterAll, FunSuite}

Expand Down Expand Up @@ -77,6 +78,332 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists testformat")
}

test("test add columns for table of using carbon with sql") {
// TODO: should support add columns for carbon dataSource table
// Limit from spark
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table ADD COLUMNS (a1 INT, b1 STRING) ")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER ADD COLUMNS does not support datasource table with type carbon."))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test add columns for table of using carbon with DF") {
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS carbon_table")
// Saves dataFrame to carbon file
df.write
.format("carbon").saveAsTable("carbon_table")
val customSchema = StructType(Array(
SparkStructField("c1", StringType),
SparkStructField("c2", StringType),
SparkStructField("number", IntegerType)))

val carbonDF = spark.read
.format("carbon")
.option("tableName", "carbon_table")
.schema(customSchema)
.load()

assert(carbonDF.schema.map(_.name) === Seq("c1", "c2", "number"))
val carbonDF2 = carbonDF.drop("c1")
assert(carbonDF2.schema.map(_.name) === Seq("c2", "number"))
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test drop columns for table of using carbon") {
// TODO: should support drop columns for carbon dataSource table
// Limit from spark
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table drop COLUMNS (a1 INT, b1 STRING) ")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test rename table name for table of using carbon") {
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS carbon_table2")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table RENAME TO carbon_table2 ")
checkAnswer(sql("SELECT COUNT(*) FROM carbon_table2"), Seq(Row(10)));
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test change data type for table of using carbon") {
//TODO: Limit from spark
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS carbon_table2")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number decimal(8,2)) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table change number number decimal(9,4)")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test add columns for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._
try {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
df.write
.format("parquet").saveAsTable("test_parquet")
sql("ALTER TABLE test_parquet ADD COLUMNS(a1 INT, b1 STRING) ")
sql("INSERT INTO test_parquet VALUES('Bob','xu',12,1,'parquet')")
TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet"), Seq(Row(11)))

sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
sql("ALTER TABLE test_parquet2 ADD COLUMNS (a1 INT, b1 STRING) ")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12,1,'parquet')")
TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet2"), Seq(Row(2)))
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
}
}

test("test drop columns for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._

sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
df.write
.format("parquet").saveAsTable("test_parquet")

val df2 = df.drop("c1")

assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
assert(df2.schema.map(_.name) === Seq("c2", "number"))

try {
sql("ALTER TABLE test_parquet DROP COLUMNS(c1)")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
}

sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
try {
sql("ALTER TABLE test_parquet2 DROP COLUMNS (a1 INT, b1 STRING) ")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet2")
}
}

test("test rename table name for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._

sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
sql("DROP TABLE IF EXISTS test_parquet3")
sql("DROP TABLE IF EXISTS test_parquet22")
df.write
.format("parquet").saveAsTable("test_parquet")

try {
sql("ALTER TABLE test_parquet rename to test_parquet3")
checkAnswer(sql("SELECT COUNT(*) FROM test_parquet3"), Seq(Row(10)));
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet3")
}

sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
try {
sql("ALTER TABLE test_parquet2 rename to test_parquet22")
checkAnswer(sql("SELECT COUNT(*) FROM test_parquet22"), Seq(Row(1)));
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet2")
sql("DROP TABLE IF EXISTS test_parquet22")
}
}

test("test change data type for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._

sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
df.write
.format("parquet").saveAsTable("test_parquet")
try {
sql("ALTER TABLE test_parquet CHANGE number number long")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
}
sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
try {
sql("ALTER TABLE test_parquet2 CHANGE number number long")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet2")
}
}

test("test read with df write") {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
import spark.implicits._
Expand Down

0 comments on commit 4e27b86

Please sign in to comment.