Skip to content

Commit

Permalink
finalfix
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanRastogi committed Mar 25, 2019
1 parent 6402ee2 commit 8467470
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 12 deletions.
Expand Up @@ -23,7 +23,8 @@ import java.util.{Date, Random}

import org.apache.commons.io.FileUtils
import org.apache.commons.lang.RandomStringUtils
import org.scalatest.BeforeAndAfterAll
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.sql.common.util.DataSourceTestUtil._
import org.apache.spark.util.SparkUtil
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
Expand All @@ -37,8 +38,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier

class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll {

class CreateTableUsingSparkCarbonFileFormatTestCase extends FunSuite with BeforeAndAfterAll {
import spark._
override def beforeAll(): Unit = {
sql("DROP TABLE IF EXISTS sdkOutputTable")
}
Expand Down
Expand Up @@ -25,10 +25,10 @@ import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, Encoder}
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.common.util.QueryTest
import org.apache.spark.sql.common.util.DataSourceTestUtil._
import org.apache.spark.sql.test.TestQueryExecutor
import org.junit.Assert
import org.scalatest.BeforeAndAfterAll
import org.scalatest.{BeforeAndAfterAll,FunSuite}

import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
Expand All @@ -37,7 +37,8 @@ import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
import org.apache.carbondata.hadoop.testutil.StoreCreator
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}

class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
class SparkCarbonDataSourceTestCase extends FunSuite with BeforeAndAfterAll {
import spark._

val warehouse1 = s"${TestQueryExecutor.projectPath}/integration/spark-datasource/target/warehouse"

Expand Down Expand Up @@ -616,7 +617,7 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
"double, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items " +
"terminated by '$'")
val sourceFile = FileFactory
.getPath(s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/Array.csv")
.getPath(s"$resource" + "../../../../../spark-datasource/src/test/resources/Array.csv")
.toString
sql(s"load data local inpath '$sourceFile' into table array_com_hive")
sql(
Expand All @@ -643,7 +644,7 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
"terminated by '$' map keys terminated by '&'")
val sourceFile = FileFactory
.getPath(
s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/structofarray.csv")
s"$resource" + "../../../../../spark-datasource/src/test/resources/structofarray.csv")
.toString
sql(s"load data local inpath '$sourceFile' into table STRUCT_OF_ARRAY_com_hive")
sql(
Expand Down Expand Up @@ -1273,7 +1274,7 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
" Timestamp,deliveryDate timestamp,deliverycharge double)row format delimited FIELDS " +
"terminated by ',' LINES terminated by '\n' stored as textfile")
val sourceFile = FileFactory
.getPath(s"$resourcesPath" +
.getPath(s"$resource" +
"../../../../../spark-datasource/src/test/resources/vardhandaterestruct.csv")
.toString
sql(s"load data local inpath '$sourceFile' into table fileformat_drop_hive")
Expand Down
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.common.util

import java.io.File

import scala.collection.JavaConverters._


import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.util.sideBySide

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties


object DataSourceTestUtil {

val rootPath = new File(this.getClass.getResource("/").getPath
+ "../../../..").getCanonicalPath
val warehouse1 = FileFactory.getPath(s"$rootPath/integration/spark-datasource/target/warehouse")
.toString
val resource = s"$rootPath/integration/spark-datasource/src/test/resources"
val metaStoreDB1 = s"$rootPath/integration/spark-datasource/target"
val spark = SparkSession
.builder()
.enableHiveSupport()
.master("local")
.config("spark.sql.warehouse.dir", warehouse1)
.config("spark.driver.host", "localhost")
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.hive.caseSensitiveInferenceMode", "INFER_AND_SAVE")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
if (!spark.sparkContext.version.startsWith("2.1")) {
spark.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
}
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "40")

def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): Unit = {
checkAnswer(df, expectedAnswer.asScala)
}

def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) {
val outputs = df.collect().map(_.mkString).mkString
for (key <- keywords) {
if (exists) {
assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in result)")
} else {
assert(!outputs.contains(key), s"Failed for $df ($key existed in the result)")
}
}
}

def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
checkAnswer(df, expectedAnswer.collect())
}

/**
* Runs the plan and makes sure the answer matches the expected result.
* If there was exception during the execution or the contents of the DataFrame does not
* match the expected result, an error message will be returned. Otherwise, a [[None]] will
* be returned.
*
* @param df the [[DataFrame]] to be executed
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = {
val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty

def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
// Converts data to types that we can do equality comparison using Scala collections.
// For BigDecimal type, the Scala type has a better definition of equality test (similar to
// Java's java.math.BigDecimal.compareTo).
// For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
// equality test.
val converted: Seq[Row] = answer.map { s =>
Row.fromSeq(s.toSeq.map {
case d: java.math.BigDecimal => BigDecimal(d)
case b: Array[Byte] => b.toSeq
case d: Double =>
if (!d.isInfinite && !d.isNaN) {
var bd = BigDecimal(d)
bd = bd.setScale(5, BigDecimal.RoundingMode.UP)
bd.doubleValue()
}
else {
d
}
case o => o
})
}
if (!isSorted) converted.sortBy(_.toString()) else converted
}

val sparkAnswer = try df.collect().toSeq catch {
case e: Exception =>
val errorMessage =
s"""
|Exception thrown while executing query:
|${ df.queryExecution }
|== Exception ==
|$e
|${ org.apache.spark.sql.catalyst.util.stackTraceToString(e) }
""".stripMargin
return Some(errorMessage)
}

if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
val errorMessage =
s"""
|Results do not match for query:
|${ df.queryExecution }
|== Results ==
|${
sideBySide(
s"== Correct Answer - ${ expectedAnswer.size } ==" +:
prepareAnswer(expectedAnswer).map(_.toString()),
s"== Spark Answer - ${ sparkAnswer.size } ==" +:
prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")
}
""".stripMargin
assert(false, errorMessage)
}
}
}
Expand Up @@ -82,9 +82,6 @@ object Spark2TestQueryExecutor {
copyResourcesifNotExists(hdfsUrl, s"$integrationPath/spark-common-test/src/test/resources",
s"$integrationPath//spark-common-cluster-test/src/test/resources/testdatafileslist.txt")
}
if (!spark.sparkContext.version.startsWith("2.1")) {
spark.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
}
FileFactory.getConfiguration.
set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
spark.sparkContext.setLogLevel("ERROR")
Expand Down

0 comments on commit 8467470

Please sign in to comment.