Skip to content

Commit

Permalink
[CARBONDATA-3154] Fix spark-2.1 test error
Browse files Browse the repository at this point in the history
Spark2.2.1 supports location, but Spark2.1.0 doesn't support location,
supports options(path 'your file path').
So we should change location to options(path ... ) and
create new directory before use "create table" in spark2.1.0.

This closes #2981
  • Loading branch information
xubo245 authored and kunal642 committed Dec 9, 2018
1 parent 382ce43 commit d9f1a81
Showing 1 changed file with 91 additions and 21 deletions.
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.sql.carbondata.datasource

import java.io.File
import java.util
import java.util.Arrays

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.apache.spark.util.SparkUtil
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.carbondata.core.datamap.DataMapStoreManager
Expand Down Expand Up @@ -998,9 +999,19 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
i += 1
}
writer.close()
spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
"byte, floatfield: float>) " +
s"using carbon location '$path'")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
if (!FileFactory.isFileExist(path)) {
FileFactory.createDirectoryAndSetPermission(path,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
}
spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
"byte, floatfield: float>) " +
s"using carbon options(path '$path')")
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
"byte, floatfield: float>) " +
s"using carbon location '$path'")
}
} catch {
case ex: Exception => throw new RuntimeException(ex)
case _ => None
Expand Down Expand Up @@ -1052,13 +1063,33 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
writer.close()
spark.sql("drop table if exists sorted_par")
spark.sql("drop table if exists sort_table")
spark.sql(s"create table sort_table (age int, height double, name string, address string," +
s" salary long, bytefield byte) using carbon location '$path'")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
spark.sql(s"create table sorted_par(age int, height double, name string, address " +
s"string," +
s"salary long, bytefield byte) using parquet location " +
s"'$warehouse1/../warehouse2'")
val path2 = s"$warehouse1/../warehouse2";
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
if (!FileFactory.isFileExist(path)) {
FileFactory.createDirectoryAndSetPermission(path,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
}
spark.sql(s"create table sort_table (age int, height double, name string, address string," +
s" salary long, bytefield byte) using carbon options(path '$path')")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
if (!FileFactory.isFileExist(path2)) {
FileFactory.createDirectoryAndSetPermission(path2,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
}
spark.sql(s"create table sorted_par(age int, height double, name string, address " +
s"string," +
s"salary long, bytefield byte) using parquet options(path " +
s"'$path2')")
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
spark.sql(s"create table sort_table (age int, height double, name string, address string," +
s" salary long, bytefield byte) using carbon location '$path'")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
spark.sql(s"create table sorted_par(age int, height double, name string, address " +
s"string," +
s"salary long, bytefield byte) using parquet location " +
s"'$warehouse1/../warehouse2'")
}

(0 to 10).foreach {
i =>
spark.sql(s"insert into sorted_par select '$i', ${ i.toDouble / 2 }, 'name$i', " +
Expand Down Expand Up @@ -1098,10 +1129,21 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
i += 1
}
writer.close()
spark.sql(s"create table complextable (stringfield string, bytearray " +
s"array<byte>, floatarray array<float>) using carbon " +
s"location " +
s"'$path'")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
if (!FileFactory.isFileExist(path)) {
FileFactory.createDirectoryAndSetPermission(path,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
}
spark.sql(s"create table complextable (stringfield string, bytearray " +
s"array<byte>, floatarray array<float>) using carbon " +
s"options( path " +
s"'$path')")
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
spark.sql(s"create table complextable (stringfield string, bytearray " +
s"array<byte>, floatarray array<float>) using carbon " +
s"location " +
s"'$path'")
}
} catch {
case ex: Exception => throw new RuntimeException(ex)
case _ => None
Expand All @@ -1123,9 +1165,20 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
private def createParquetTable {
val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path"))
spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
s"string," +
s"salary long, floatField float, bytefield byte) using parquet location '$path'")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
if (!FileFactory.isFileExist(path)) {
FileFactory.createDirectoryAndSetPermission(path,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
}
spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
s"string," +
s"salary long, floatField float, bytefield byte) using parquet options(path '$path')")
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
s"string," +
s"salary long, floatField float, bytefield byte) using parquet location '$path'")
}

(0 to 10).foreach {
i => spark.sql(s"insert into par_table select 'true','$i', ${i.toDouble / 2}, 'name$i', " +
s"'address$i', ${i*100}, $i.$i, '$i'")
Expand Down Expand Up @@ -1237,7 +1290,15 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
val rowCount = 3
buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount)
spark.sql("drop table if exists carbon_external")
spark.sql(s"create table carbon_external using carbon location '$writerPath'")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
if (!FileFactory.isFileExist(writerPath)) {
FileFactory.createDirectoryAndSetPermission(writerPath,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
}
spark.sql(s"create table carbon_external using carbon options(path '$writerPath')")
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
spark.sql(s"create table carbon_external using carbon location '$writerPath'")
}
assert(spark.sql("select * from carbon_external").count() == rowCount)
spark.sql("drop table if exists carbon_external")
}
Expand Down Expand Up @@ -1268,8 +1329,17 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
i += 1
}
writer.close()
spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
s"'$path'")
if (SparkUtil.isSparkVersionEqualTo("2.1")) {
if (!FileFactory.isFileExist(path)) {
FileFactory.createDirectoryAndSetPermission(path,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
}
spark.sql(s"create table multi_page (a string, b float, c byte) using carbon options(path " +
s"'$path')")
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
s"'$path'")
}
assert(spark.sql("select * from multi_page").count() == 33000)
} catch {
case ex: Exception => throw new RuntimeException(ex)
Expand Down

0 comments on commit d9f1a81

Please sign in to comment.