Skip to content

Commit

Permalink
[CARBONDATA-3029][Test] Fix errors in spark datasource tests in windo…
Browse files Browse the repository at this point in the history
…ws env

In current SparkCarbonDataSourceTest, the path specified in creating
table and writer in windows env looks like '\D:\xx\xx', this will cause
test failure such as "java.lang.IllegalArgumentException: Can not create
a Path from an empty string".

Here in this commit, we fixed this problem by normalizing the path and
convert the separator in path to unix style using carbon's FileFactory.

This closes #2835
  • Loading branch information
xuchuanyin authored and jackylk committed Oct 23, 2018
1 parent d941a5d commit c58ef59
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Expand Up @@ -571,7 +571,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists array_com_hive")
spark.sql(s"drop table if exists array_com")
spark.sql("create table array_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>,ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items terminated by '$'")
spark.sql(s"load data local inpath '$resource/Array.csv' into table array_com_hive")
val sourceFile = FileFactory.getPath(s"$resource/Array.csv").toString
spark.sql(s"load data local inpath '$sourceFile' into table array_com_hive")
spark.sql("create table Array_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, ARRAY_INT array<int>,ARRAY_STRING array<string>,ARRAY_DATE array<timestamp>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double, HQ_DEPOSIT double) using carbon")
spark.sql("insert into Array_com select * from array_com_hive")
TestUtil.checkAnswer(spark.sql("select * from Array_com order by CUST_ID ASC limit 3"), spark.sql("select * from array_com_hive order by CUST_ID ASC limit 3"))
Expand All @@ -583,7 +584,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists STRUCT_OF_ARRAY_com_hive")
spark.sql(s"drop table if exists STRUCT_OF_ARRAY_com")
spark.sql(" create table STRUCT_OF_ARRAY_com_hive (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int,CHECK_DATE: timestamp ,SNo: array<int>,sal1: array<double>,state: array<string>,date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT float, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items terminated by '$' map keys terminated by '&'")
spark.sql(s"load data local inpath '$resource/structofarray.csv' into table STRUCT_OF_ARRAY_com_hive")
val sourceFile = FileFactory.getPath(s"$resource/structofarray.csv").toString
spark.sql(s"load data local inpath '$sourceFile' into table STRUCT_OF_ARRAY_com_hive")
spark.sql("create table STRUCT_OF_ARRAY_com (CUST_ID string, YEAR int, MONTH int, AGE int, GENDER string, EDUCATED string, IS_MARRIED string, STRUCT_OF_ARRAY struct<ID: int,CHECK_DATE: timestamp,SNo: array<int>,sal1: array<double>,state: array<string>,date1: array<timestamp>>,CARD_COUNT int,DEBIT_COUNT int, CREDIT_COUNT int, DEPOSIT double, HQ_DEPOSIT double) using carbon")
spark.sql(" insert into STRUCT_OF_ARRAY_com select * from STRUCT_OF_ARRAY_com_hive")
TestUtil.checkAnswer(spark.sql("select * from STRUCT_OF_ARRAY_com order by CUST_ID ASC"), spark.sql("select * from STRUCT_OF_ARRAY_com_hive order by CUST_ID ASC"))
Expand Down Expand Up @@ -628,7 +630,8 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
PDP_ADDR string,APN_NI string,APN_OI string,CARD_ID string,TIME_OUT int,LOGIN_TIME timestamp,USER_IMPU string,OPPO_IMPU string,USER_LAST_IMPI string,
USER_CURR_IMPI string,SUPSERVICE_TYPE bigint,SUPSERVICE_TYPE_SUBCODE bigint,SMS_CENTERNUM string,USER_LAST_LONGITUDE double,USER_LAST_LATITUDE double,
USER_LAST_MSC string,USER_LAST_BASE_STATION string,LOAD_ID bigint,P_CAP_TIME string) ROW format delimited FIELDS terminated by '|'""".stripMargin)
spark.sql(s"load data local inpath '$resource/j2.csv' into table h_jin")
val sourceFile = FileFactory.getPath(s"$resource/j2.csv").toString
spark.sql(s"load data local inpath '$sourceFile' into table h_jin")
spark.sql(s"""create table c_jin(RECORD_ID string,
CDR_ID string,LOCATION_CODE int,SYSTEM_ID string,
CLUE_ID string,HIT_ELEMENT string,CARRIER_CODE string,CAP_TIME date,
Expand Down Expand Up @@ -970,7 +973,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {

test("test struct of float type and byte type") {
import scala.collection.JavaConverters._
val path = new File(warehouse1+"/sdk1").getAbsolutePath
val path = FileFactory.getPath(warehouse1+"/sdk1").toString
FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
spark.sql("drop table if exists complextable")
val fields = List(new StructField
Expand Down Expand Up @@ -1014,8 +1017,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
}

test("test bytefield as sort column") {
import scala.collection.JavaConverters._
val path = new File(warehouse1+"/sdk1").getAbsolutePath
val path = FileFactory.getPath(warehouse1+"/sdk1").toString
FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
var fields: Array[Field] = new Array[Field](8)
// same column name, but name as boolean type
Expand Down Expand Up @@ -1070,7 +1072,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {

test("test array of float type and byte type") {
import scala.collection.JavaConverters._
val path = new File(warehouse1+"/sdk1").getAbsolutePath
val path = FileFactory.getPath(warehouse1+"/sdk1").toString
FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
spark.sql("drop table if exists complextable")
val structType =
Expand Down Expand Up @@ -1239,7 +1241,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
}

test("test byte and float for multiple pages") {
val path = new File(warehouse1+"/sdk1").getAbsolutePath
val path = FileFactory.getPath(warehouse1+"/sdk1").toString
FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
spark.sql("drop table if exists multi_page")
var fields: Array[Field] = new Array[Field](8)
Expand Down
Expand Up @@ -31,14 +31,15 @@ import org.apache.spark.sql.catalyst.util.sideBySide
import org.junit.Assert

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.sdk.file.CarbonWriter

object TestUtil {

val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val warehouse1 = s"$rootPath/integration/spark-datasource/target/warehouse"
val warehouse1 = FileFactory.getPath(s"$rootPath/integration/spark-datasource/target/warehouse").toString
val resource = s"$rootPath/integration/spark-datasource/src/test/resources"
val metastoredb1 = s"$rootPath/integration/spark-datasource/target"
val spark = SparkSession
Expand Down

0 comments on commit c58ef59

Please sign in to comment.