Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-3230] Add alter test case for datasource #3024

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField => SparkStructField, StructType}
import org.apache.spark.util.SparkUtil
import org.scalatest.{BeforeAndAfterAll, FunSuite}

Expand Down Expand Up @@ -77,6 +78,332 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists testformat")
}

test("test add columns for table of using carbon with sql") {
// TODO: should support add columns for carbon dataSource table
// Limit from spark
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table ADD COLUMNS (a1 INT, b1 STRING) ")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER ADD COLUMNS does not support datasource table with type carbon."))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test add columns for table of using carbon with DF") {
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS carbon_table")
// Saves dataFrame to carbon file
df.write
.format("carbon").saveAsTable("carbon_table")
val customSchema = StructType(Array(
SparkStructField("c1", StringType),
SparkStructField("c2", StringType),
SparkStructField("number", IntegerType)))

val carbonDF = spark.read
.format("carbon")
.option("tableName", "carbon_table")
.schema(customSchema)
.load()

assert(carbonDF.schema.map(_.name) === Seq("c1", "c2", "number"))
val carbonDF2 = carbonDF.drop("c1")
assert(carbonDF2.schema.map(_.name) === Seq("c2", "number"))
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test drop columns for table of using carbon") {
// TODO: should support drop columns for carbon dataSource table
// Limit from spark
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table drop COLUMNS (a1 INT, b1 STRING) ")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test rename table name for table of using carbon") {
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS carbon_table2")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table RENAME TO carbon_table2 ")
checkAnswer(sql("SELECT COUNT(*) FROM carbon_table2"), Seq(Row(10)));
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test change data type for table of using carbon") {
//TODO: Limit from spark
import spark.implicits._
import spark._
try {
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
sql("DROP TABLE IF EXISTS carbon_table2")
// Saves dataFrame to carbon file
df.write
.format("parquet").saveAsTable("test_parquet")
sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number decimal(8,2)) USING carbon")
sql("INSERT INTO carbon_table SELECT * FROM test_parquet")
TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"),
sql("SELECT * FROM test_parquet WHERE c1='a1'"))
if (!sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
sql("ALTER TABLE carbon_table change number number decimal(9,4)")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS carbon_table")
}
}

test("test add columns for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._
try {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
df.write
.format("parquet").saveAsTable("test_parquet")
sql("ALTER TABLE test_parquet ADD COLUMNS(a1 INT, b1 STRING) ")
sql("INSERT INTO test_parquet VALUES('Bob','xu',12,1,'parquet')")
TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet"), Seq(Row(11)))

sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
sql("ALTER TABLE test_parquet2 ADD COLUMNS (a1 INT, b1 STRING) ")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12,1,'parquet')")
TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet2"), Seq(Row(2)))
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
}
}

test("test drop columns for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._

sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
df.write
.format("parquet").saveAsTable("test_parquet")

val df2 = df.drop("c1")

assert(df.schema.map(_.name) === Seq("c1", "c2", "number"))
assert(df2.schema.map(_.name) === Seq("c2", "number"))

try {
sql("ALTER TABLE test_parquet DROP COLUMNS(c1)")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
}

sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
try {
sql("ALTER TABLE test_parquet2 DROP COLUMNS (a1 INT, b1 STRING) ")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet2")
}
}

test("test rename table name for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._

sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
sql("DROP TABLE IF EXISTS test_parquet3")
sql("DROP TABLE IF EXISTS test_parquet22")
df.write
.format("parquet").saveAsTable("test_parquet")

try {
sql("ALTER TABLE test_parquet rename to test_parquet3")
checkAnswer(sql("SELECT COUNT(*) FROM test_parquet3"), Seq(Row(10)));
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet3")
}

sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
try {
sql("ALTER TABLE test_parquet2 rename to test_parquet22")
checkAnswer(sql("SELECT COUNT(*) FROM test_parquet22"), Seq(Row(1)));
} catch {
case e: Exception =>
e.printStackTrace()
assert(false)
} finally {
sql("DROP TABLE IF EXISTS test_parquet2")
sql("DROP TABLE IF EXISTS test_parquet22")
}
}

test("test change data type for table of using parquet") {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
import spark._

sql("DROP TABLE IF EXISTS test_parquet")
sql("DROP TABLE IF EXISTS test_parquet2")
df.write
.format("parquet").saveAsTable("test_parquet")
try {
sql("ALTER TABLE test_parquet CHANGE number number long")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet")
}
sql("DROP TABLE IF EXISTS test_parquet2")
sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet")
sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)")
try {
sql("ALTER TABLE test_parquet2 CHANGE number number long")
assert(false)
} catch {
case e: Exception =>
assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column"))
} finally {
sql("DROP TABLE IF EXISTS test_parquet2")
}
}

test("test read with df write") {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
import spark.implicits._
Expand Down