Skip to content

Commit

Permalink
Fix tests 2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Aug 28, 2018
1 parent 4a5a7ef commit febd215
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 31 deletions.
Expand Up @@ -55,9 +55,12 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon")
spark.sql("insert into carbon_table select * from testparquet")
TestUtil.checkAnswer(spark.sql("select * from carbon_table where c1='a1'"), spark.sql("select * from testparquet where c1='a1'"))
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(warehouse1+"/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
if (!spark.sparkContext.version.startsWith("2.1")) {
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
}
spark.sql("drop table if exists testparquet")
spark.sql("drop table if exists testformat")
}
Expand All @@ -79,23 +82,27 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
}

test("test write using subfolder") {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")

// Saves dataframe to carbon file
df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())

val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
assert(frame.where("c1='a1'").count() == 3)
val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance().clearDataMaps(AbsoluteTableIdentifier.from(warehouse1+"/test_folder"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
if (!spark.sparkContext.version.startsWith("2.1")) {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")

// Saves dataframe to carbon file
df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/"+System.nanoTime())

val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
assert(frame.where("c1='a1'").count() == 3)

val mapSize = DataMapStoreManager.getInstance().getAllDataMaps.size()
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/test_folder"))
assert(mapSize > DataMapStoreManager.getInstance().getAllDataMaps.size())
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
}
}

test("test write using partition ddl") {
Expand All @@ -112,7 +119,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon PARTITIONED by (c2)")
spark.sql("insert into carbon_table select * from testparquet")
// TODO fix in 2.1
if (!spark.sparkContext.version.contains("2.1")) {
if (!spark.sparkContext.version.startsWith("2.1")) {
assert(spark.sql("select * from carbon_table").count() == 10)
TestUtil
.checkAnswer(spark.sql("select * from carbon_table"),
Expand Down Expand Up @@ -336,9 +343,13 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {

// Saves dataframe to carbon file
df.write.format("carbon").save(warehouse1 + "/test_folder/")
if (!spark.sparkContext.version.startsWith("2.1")) {
spark
.sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")

spark.sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
checkAnswer(spark.sql("select * from test123"), spark.read.format("carbon").load(warehouse1 + "/test_folder/"))
checkAnswer(spark.sql("select * from test123"),
spark.read.format("carbon").load(warehouse1 + "/test_folder/"))
}
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
spark.sql("drop table if exists test123")
}
Expand All @@ -348,10 +359,16 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists test123_par")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
// Saves dataframe to carbon file
if (!spark.sparkContext.version.startsWith("2.1")) {
spark
.sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")

spark.sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
spark.sql(s"create table test123_par (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
TestUtil.checkAnswer(spark.sql("select count(*) from test123"), spark.sql("select count(*) from test123_par"))
spark
.sql(s"create table test123_par (c1 string, c2 string, arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
TestUtil
.checkAnswer(spark.sql("select count(*) from test123"),
spark.sql("select count(*) from test123_par"))
}
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
spark.sql("drop table if exists test123")
spark.sql("drop table if exists test123_par")
Expand All @@ -368,10 +385,13 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {

// Saves dataframe to carbon file
df.write.format("parquet").saveAsTable("test123_par")
if (!spark.sparkContext.version.startsWith("2.1")) {
spark
.sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>) using carbon options('sort_columns'='') location '$warehouse1/test_folder/'")

spark.sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc bigint, doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc struct<_1:string, _2:decimal(38,18)>) using carbon options('sort_columns'='') location '$warehouse1/test_folder/'")
spark.sql(s"insert into test123 select * from test123_par")
checkAnswer(spark.sql("select * from test123"), spark.sql(s"select * from test123_par"))
spark.sql(s"insert into test123 select * from test123_par")
checkAnswer(spark.sql("select * from test123"), spark.sql(s"select * from test123_par"))
}
spark.sql("drop table if exists test123")
spark.sql("drop table if exists test123_par")
}
Expand Down
Expand Up @@ -41,8 +41,9 @@ object TestUtil {
.config("spark.sql.crossJoin.enabled", "true")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

spark.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
if (!spark.sparkContext.version.startsWith("2.1")) {
spark.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
}

def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]):Unit = {
checkAnswer(df, expectedAnswer.asScala) match {
Expand Down

0 comments on commit febd215

Please sign in to comment.