Skip to content

Commit

Permalink
[SPARK-32813][SQL] Get default config of ParquetSource vectorized rea…
Browse files Browse the repository at this point in the history
…der if no active SparkSession

### What changes were proposed in this pull request?

If no active SparkSession is available, let `FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of ParquetSource vectorized reader instead of failing the query execution.

### Why are the changes needed?

Fix a bug that if no active SparkSession is available, file-based data source scan for Parquet Source will throw exception.

### Does this PR introduce _any_ user-facing change?

Yes, this change fixes the bug.

### How was this patch tested?

Unit test.

Closes #29667 from viirya/SPARK-32813.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit de0dc52)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
viirya authored and HyukjinKwon committed Sep 9, 2020
1 parent 86b9dd9 commit 4c0f9d8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ case class FileSourceScanExec(

private lazy val needsUnsafeRowConversion: Boolean = {
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
sqlContext.conf.parquetVectorizedReaderEnabled
} else {
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@

package org.apache.spark.sql.execution

import java.util.concurrent.Executors

import scala.collection.parallel.immutable.ParRange
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.util.ThreadUtils

class SQLExecutionSuite extends SparkFunSuite {

Expand Down Expand Up @@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite {

spark.stop()
}

test("SPARK-32813: Table scan should work in different thread") {
val executor1 = Executors.newSingleThreadExecutor()
val executor2 = Executors.newSingleThreadExecutor()
var session: SparkSession = null
SparkSession.cleanupAnyExistingSession()

withTempDir { tempDir =>
try {
val tablePath = tempDir.toString + "/table"
val df = ThreadUtils.awaitResult(Future {
session = SparkSession.builder().appName("test").master("local[*]").getOrCreate()

session.createDataFrame(
session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
StructType(Seq(
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false))))
.write.parquet(tablePath)

session.read.parquet(tablePath)
}(ExecutionContext.fromExecutorService(executor1)), 1.minute)

ThreadUtils.awaitResult(Future {
assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
} finally {
executor1.shutdown()
executor2.shutdown()
session.stop()
}
}
}
}

object SQLExecutionSuite {
Expand Down

0 comments on commit 4c0f9d8

Please sign in to comment.