diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala index 026c5ca7bd1..f89c1a8c6b6 100644 --- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala +++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala @@ -26,6 +26,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.carbondata.datasource.TestUtil._ +import org.apache.spark.sql.types.{IntegerType, StringType, StructField => SparkStructField, StructType} import org.apache.spark.util.SparkUtil import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -77,6 +78,332 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll { spark.sql("drop table if exists testformat") } + test("test add columns for table of using carbon with sql") { + // TODO: should support add columns for carbon dataSource table + // Limit from spark + import spark.implicits._ + import spark._ + try { + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + // Saves dataFrame to carbon file + df.write + .format("parquet").saveAsTable("test_parquet") + sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon") + sql("INSERT INTO carbon_table SELECT * FROM test_parquet") + TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"), + sql("SELECT * FROM test_parquet WHERE c1='a1'")) + if (!sparkContext.version.startsWith("2.1")) { + val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size() + DataMapStoreManager.getInstance() + .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table")) + assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size()) + } + assert(df.schema.map(_.name) === Seq("c1", "c2", "number")) + sql("ALTER TABLE carbon_table ADD COLUMNS (a1 INT, b1 STRING) ") + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("ALTER ADD COLUMNS does not support datasource table with type carbon.")) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + } + } + + test("test add columns for table of using carbon with DF") { + import spark.implicits._ + import spark._ + try { + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + sql("DROP TABLE IF EXISTS carbon_table") + // Saves dataFrame to carbon file + df.write + .format("carbon").saveAsTable("carbon_table") + val customSchema = StructType(Array( + SparkStructField("c1", StringType), + SparkStructField("c2", StringType), + SparkStructField("number", IntegerType))) + + val carbonDF = spark.read + .format("carbon") + .option("tableName", "carbon_table") + .schema(customSchema) + .load() + + assert(carbonDF.schema.map(_.name) === Seq("c1", "c2", "number")) + val carbonDF2 = carbonDF.drop("c1") + assert(carbonDF2.schema.map(_.name) === Seq("c2", "number")) + } catch { + case e: Exception => + e.printStackTrace() + assert(false) + } finally { + sql("DROP TABLE IF EXISTS carbon_table") + } + } + + test("test drop columns for table of using carbon") { + // TODO: should support drop columns for carbon dataSource table + // Limit from spark + import spark.implicits._ + import spark._ + try { + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + // Saves dataFrame to carbon file + df.write + .format("parquet").saveAsTable("test_parquet") + sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon") + sql("INSERT INTO carbon_table SELECT * FROM test_parquet") + TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"), + sql("SELECT * FROM test_parquet WHERE c1='a1'")) + if (!sparkContext.version.startsWith("2.1")) { + val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size() + DataMapStoreManager.getInstance() + .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table")) + assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size()) + } + assert(df.schema.map(_.name) === Seq("c1", "c2", "number")) + sql("ALTER TABLE carbon_table drop COLUMNS (a1 INT, b1 STRING) ") + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting")) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + } + } + + test("test rename table name for table of using carbon") { + import spark.implicits._ + import spark._ + try { + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + sql("DROP TABLE IF EXISTS carbon_table2") + // Saves dataFrame to carbon file + df.write + .format("parquet").saveAsTable("test_parquet") + sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number INT) USING carbon") + sql("INSERT INTO carbon_table SELECT * FROM test_parquet") + TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"), + sql("SELECT * FROM test_parquet WHERE c1='a1'")) + if (!sparkContext.version.startsWith("2.1")) { + val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size() + DataMapStoreManager.getInstance() + .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table")) + assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size()) + } + assert(df.schema.map(_.name) === Seq("c1", "c2", "number")) + sql("ALTER TABLE carbon_table RENAME TO carbon_table2 ") + checkAnswer(sql("SELECT COUNT(*) FROM carbon_table2"), Seq(Row(10))); + } catch { + case e: Exception => + e.printStackTrace() + assert(false) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + } + } + + test("test change data type for table of using carbon") { + //TODO: Limit from spark + import spark.implicits._ + import spark._ + try { + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + sql("DROP TABLE IF EXISTS carbon_table2") + // Saves dataFrame to carbon file + df.write + .format("parquet").saveAsTable("test_parquet") + sql("CREATE TABLE carbon_table(c1 STRING, c2 STRING, number decimal(8,2)) USING carbon") + sql("INSERT INTO carbon_table SELECT * FROM test_parquet") + TestUtil.checkAnswer(sql("SELECT * FROM carbon_table WHERE c1='a1'"), + sql("SELECT * FROM test_parquet WHERE c1='a1'")) + if (!sparkContext.version.startsWith("2.1")) { + val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size() + DataMapStoreManager.getInstance() + .clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table")) + assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size()) + } + assert(df.schema.map(_.name) === Seq("c1", "c2", "number")) + sql("ALTER TABLE carbon_table change number number decimal(9,4)") + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column")) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS carbon_table") + } + } + + test("test add columns for table of using parquet") { + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + import spark._ + try { + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS test_parquet2") + df.write + .format("parquet").saveAsTable("test_parquet") + sql("ALTER TABLE test_parquet ADD COLUMNS(a1 INT, b1 STRING) ") + sql("INSERT INTO test_parquet VALUES('Bob','xu',12,1,'parquet')") + TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet"), Seq(Row(11))) + + sql("DROP TABLE IF EXISTS test_parquet2") + sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet") + sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)") + sql("ALTER TABLE test_parquet2 ADD COLUMNS (a1 INT, b1 STRING) ") + sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12,1,'parquet')") + TestUtil.checkAnswer(sql("SELECT COUNT(*) FROM test_parquet2"), Seq(Row(2))) + } catch { + case e: Exception => + e.printStackTrace() + assert(false) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS test_parquet2") + } + } + + test("test drop columns for table of using parquet") { + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + import spark._ + + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS test_parquet2") + df.write + .format("parquet").saveAsTable("test_parquet") + + val df2 = df.drop("c1") + + assert(df.schema.map(_.name) === Seq("c1", "c2", "number")) + assert(df2.schema.map(_.name) === Seq("c2", "number")) + + try { + sql("ALTER TABLE test_parquet DROP COLUMNS(c1)") + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting")) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + } + + sql("DROP TABLE IF EXISTS test_parquet2") + sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet") + sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)") + try { + sql("ALTER TABLE test_parquet2 DROP COLUMNS (a1 INT, b1 STRING) ") + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("mismatched input 'COLUMNS' expecting")) + } finally { + sql("DROP TABLE IF EXISTS test_parquet2") + } + } + + test("test rename table name for table of using parquet") { + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + import spark._ + + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS test_parquet2") + sql("DROP TABLE IF EXISTS test_parquet3") + sql("DROP TABLE IF EXISTS test_parquet22") + df.write + .format("parquet").saveAsTable("test_parquet") + + try { + sql("ALTER TABLE test_parquet rename to test_parquet3") + checkAnswer(sql("SELECT COUNT(*) FROM test_parquet3"), Seq(Row(10))); + } catch { + case e: Exception => + e.printStackTrace() + assert(false) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS test_parquet3") + } + + sql("DROP TABLE IF EXISTS test_parquet2") + sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet") + sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)") + try { + sql("ALTER TABLE test_parquet2 rename to test_parquet22") + checkAnswer(sql("SELECT COUNT(*) FROM test_parquet22"), Seq(Row(1))); + } catch { + case e: Exception => + e.printStackTrace() + assert(false) + } finally { + sql("DROP TABLE IF EXISTS test_parquet2") + sql("DROP TABLE IF EXISTS test_parquet22") + } + } + + test("test change data type for table of using parquet") { + import spark.implicits._ + val df = spark.sparkContext.parallelize(1 to 10) + .map(x => ("a" + x % 10, "b", x)) + .toDF("c1", "c2", "number") + import spark._ + + sql("DROP TABLE IF EXISTS test_parquet") + sql("DROP TABLE IF EXISTS test_parquet2") + df.write + .format("parquet").saveAsTable("test_parquet") + try { + sql("ALTER TABLE test_parquet CHANGE number number long") + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column")) + } finally { + sql("DROP TABLE IF EXISTS test_parquet") + } + sql("DROP TABLE IF EXISTS test_parquet2") + sql("CREATE TABLE test_parquet2(c1 STRING, c2 STRING, number INT) USING parquet") + sql("INSERT INTO test_parquet2 VALUES('Bob','xu',12)") + try { + sql("ALTER TABLE test_parquet2 CHANGE number number long") + assert(false) + } catch { + case e: Exception => + assert(e.getMessage.contains("ALTER TABLE CHANGE COLUMN is not supported for changing column")) + } finally { + sql("DROP TABLE IF EXISTS test_parquet2") + } + } + test("test read with df write") { FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder")) import spark.implicits._