Skip to content

Commit

Permalink
[SPARK-40245][SQL] Fix FileScan equality check when partition or data…
Browse files Browse the repository at this point in the history
… filter columns are not read

### What changes were proposed in this pull request?
Unfortunately the fix in apache/spark#31848 was not correct in all cases. When the partition or data filter contains a column that is not in `readSchema()` the filter nornalization in `FileScan.equals()` doesn't work.

### Why are the changes needed?
To fix `FileScan.equals()` to fix reuse issues.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added new UT.

Closes #37693 from peter-toth/SPARK-40245-fix-filescan-equals.

Authored-by: Peter Toth <ptoth@cloudera.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
a0x8o committed Aug 29, 2022
1 parent 1c96ffe commit 480a8e6
Show file tree
Hide file tree
Showing 52 changed files with 2,142 additions and 339 deletions.
3 changes: 1 addition & 2 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
},
"CANNOT_DECODE_URL" : {
"message" : [
"Cannot decode url : <url>.",
"<details>"
"Cannot decode url : <url>."
],
"sqlState" : "42000"
},
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ private[spark] class Executor(
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), t)
}
} finally {
cleanMDCForTask(taskName, mdcProperties)
runningTasks.remove(taskId)
if (taskStarted) {
// This means the task was successfully deserialized, its stageId and stageAttemptId
Expand All @@ -788,8 +789,6 @@ private[spark] class Executor(

private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
try {
// make sure we run the task with the user-specified mdc properties only
MDC.clear()
mdc.foreach { case (key, value) => MDC.put(key, value) }
// avoid overriding the takName by the user
MDC.put("mdc.taskName", taskName)
Expand All @@ -798,6 +797,15 @@ private[spark] class Executor(
}
}

private def cleanMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
try {
mdc.foreach { case (key, _) => MDC.remove(key) }
MDC.remove("mdc.taskName")
} catch {
case _: NoSuchFieldError => logInfo("MDC is not supported.")
}
}

/**
* Supervises the killing / cancellation of a task by sending the interrupted flag, optionally
* sending a Thread.interrupt(), and monitoring the task until it finishes.
Expand Down Expand Up @@ -897,6 +905,7 @@ private[spark] class Executor(
}
}
} finally {
cleanMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)
// Clean up entries in the taskReaperForTask map.
taskReaperForTask.synchronized {
taskReaperForTask.get(taskId).foreach { taskReaperInMap =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1956,6 +1956,13 @@ package object config {
.intConf
.createWithDefault(10)

private[spark] val RDD_LIMIT_INITIAL_NUM_PARTITIONS =
ConfigBuilder("spark.rdd.limit.initialNumPartitions")
.version("3.4.0")
.intConf
.checkValue(_ > 0, "value should be positive")
.createWithDefault(1)

private[spark] val RDD_LIMIT_SCALE_UP_FACTOR =
ConfigBuilder("spark.rdd.limit.scaleUpFactor")
.version("2.1.0")
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, JobSubmitter}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{RDD_LIMIT_INITIAL_NUM_PARTITIONS, RDD_LIMIT_SCALE_UP_FACTOR}
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -72,6 +73,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
val results = new ArrayBuffer[T]
val totalParts = self.partitions.length

val scaleUpFactor = Math.max(self.conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)

/*
Recursively triggers jobs to scan partitions until either the requested
number of elements are retrieved, or the partitions to scan are exhausted.
Expand All @@ -84,18 +87,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
} else {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
var numPartsToTry = self.conf.get(RDD_LIMIT_INITIAL_NUM_PARTITIONS)
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%. We also cap the estimation in the end.
if (results.size == 0) {
numPartsToTry = partsScanned * 4L
// If we didn't find any rows after the previous iteration, multiply by
// limitScaleUpFactor and retry. Otherwise, interpolate the number of partitions we need
// to try, but overestimate it by 50%. We also cap the estimation in the end.
if (results.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ abstract class RDD[T: ClassTag](
s"Fraction must be nonnegative, but got ${fraction}")

withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
Expand Down Expand Up @@ -1445,12 +1444,12 @@ abstract class RDD[T: ClassTag](
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
var numPartsToTry = conf.get(RDD_LIMIT_INITIAL_NUM_PARTITIONS)
val left = num - buf.size
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
// If we didn't find any rows after the previous iteration, multiply by
// limitScaleUpFactor and retry. Otherwise, interpolate the number of partitions we need
// to try, but overestimate it by 50%. We also cap the estimation in the end.
if (buf.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,13 @@ class BitSet(numBits: Int) extends Serializable {

/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1

override def equals(other: Any): Boolean = other match {
case otherSet: BitSet => Arrays.equals(words, otherSet.words)
case _ => false
}

override def hashCode(): Int = {
Arrays.hashCode(words)
}
}
39 changes: 38 additions & 1 deletion core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.rdd

import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream}
import java.lang.management.ManagementFactory
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand All @@ -32,8 +33,9 @@ import org.scalatest.concurrent.Eventually

import org.apache.spark._
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.internal.config.{RDD_LIMIT_INITIAL_NUM_PARTITIONS, RDD_PARALLEL_LISTING_THRESHOLD}
import org.apache.spark.rdd.RDDSuiteUtils._
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.util.{ThreadUtils, Utils}

class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
Expand Down Expand Up @@ -1255,6 +1257,41 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
assert(numPartsPerLocation(locations(1)) > 0.4 * numCoalescedPartitions)
}

test("SPARK-40211: customize initialNumPartitions for take") {
val totalElements = 100
val numToTake = 50
val rdd = sc.parallelize(0 to totalElements, totalElements)
import scala.language.reflectiveCalls
val jobCountListener = new SparkListener {
private var count: AtomicInteger = new AtomicInteger(0)
def getCount: Int = count.get
def reset(): Unit = count.set(0)
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
count.incrementAndGet()
}
}
sc.addSparkListener(jobCountListener)
// with default RDD_LIMIT_INITIAL_NUM_PARTITIONS = 1, expecting multiple jobs
rdd.take(numToTake)
sc.listenerBus.waitUntilEmpty()
assert(jobCountListener.getCount > 1)
jobCountListener.reset()
rdd.takeAsync(numToTake).get()
sc.listenerBus.waitUntilEmpty()
assert(jobCountListener.getCount > 1)

// setting RDD_LIMIT_INITIAL_NUM_PARTITIONS to large number(1000), expecting only 1 job
sc.conf.set(RDD_LIMIT_INITIAL_NUM_PARTITIONS, 1000)
jobCountListener.reset()
rdd.take(numToTake)
sc.listenerBus.waitUntilEmpty()
assert(jobCountListener.getCount == 1)
jobCountListener.reset()
rdd.takeAsync(numToTake).get()
sc.listenerBus.waitUntilEmpty()
assert(jobCountListener.getCount == 1)
}

// NOTE
// Below tests calling sc.stop() have to be the last tests in this suite. If there are tests
// running after them and if they access sc those tests will fail as sc is already closed, because
Expand Down
14 changes: 13 additions & 1 deletion docs/sql-ref-functions-udf-hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,22 @@ SELECT testUDF(value) FROM t;
| 2.0|
| 3.0|
+--------------+

-- Register `UDFSubstr` and use it in Spark SQL.
-- Note that, it can achieve better performance if the return types and method parameters use Java primitives.
-- e.g., UDFSubstr. The data processing method is UTF8String <-> Text <-> String. we can avoid UTF8String <-> Text.
CREATE TEMPORARY FUNCTION hive_substr AS 'org.apache.hadoop.hive.ql.udf.UDFSubstr';

select hive_substr('Spark SQL', 1, 5) as value;
+-----+
|value|
+-----+
|Spark|
+-----+
```


An example below uses [GenericUDTFExplode](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java) derived from [GenericUDTF](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java).
An example below uses [GenericUDTFExplode](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExplode.java) derived from [GenericUDTF](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTF.java).

```sql
-- Register `GenericUDTFExplode` and use it in Spark SQL
Expand Down
16 changes: 10 additions & 6 deletions hadoop-cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,10 @@
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<extra.source.dir>src/hadoop-3/main/scala</extra.source.dir>
<extra.testsource.dir>src/hadoop-3/test/scala</extra.testsource.dir>
<extra.java.source.dir>src/hadoop-3/main/java</extra.java.source.dir>
<extra.java.testsource.dir>src/hadoop-3/test/java</extra.java.testsource.dir>
<extra.scala.source.dir>src/hadoop-3/main/scala</extra.scala.source.dir>
<extra.scala.testsource.dir>src/hadoop-3/test/scala</extra.scala.testsource.dir>
</properties>

<build>
Expand All @@ -240,26 +242,28 @@
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-scala-sources</id>
<id>add-extra-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${extra.source.dir}</source>
<source>${extra.java.source.dir}</source>
<source>${extra.scala.source.dir}</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<id>add-extra-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${extra.testsource.dir}</source>
<source>${extra.java.testsource.dir}</source>
<source>${extra.scala.testsource.dir}</source>
</sources>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.io.cloud.abortable;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

public class AbortableFileSystem extends RawLocalFileSystem {

public static String ABORTABLE_FS_SCHEME = "abortable";

@Override
public URI getUri() {
return URI.create(ABORTABLE_FS_SCHEME + ":///");
}

public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
FSDataOutputStream out = this.create(f, overwrite, bufferSize, replication, blockSize,
progress, permission);
return out;
}

private FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress, FsPermission permission) throws IOException {
if (this.exists(f) && !overwrite) {
throw new FileAlreadyExistsException("File already exists: " + f);
} else {
Path parent = f.getParent();
if (parent != null && !this.mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent.toString());
} else {
return new FSDataOutputStream(this.createOutputStreamWithMode(f, false, permission), null);
}
}
}

@Override
protected OutputStream createOutputStreamWithMode(Path f, boolean append,
FsPermission permission) throws IOException {
return new AbortableOutputStream(f, append, permission);
}

class AbortableOutputStream extends ByteArrayOutputStream
implements Abortable, StreamCapabilities {

private final AtomicBoolean closed = new AtomicBoolean(false);

private Path f;

private boolean append;

private FsPermission permission;

AbortableOutputStream(Path f, boolean append, FsPermission permission) {
this.f = f;
this.append = append;
this.permission = permission;
}

@Override
public void close() throws IOException {
if (closed.getAndSet(true)) {
return;
}

OutputStream output =
AbortableFileSystem.super.createOutputStreamWithMode(f, append, permission);
writeTo(output);
output.close();
}

@Override
public AbortableResult abort() {
final boolean isAlreadyClosed = closed.getAndSet(true);
return new AbortableResult() {
public boolean alreadyClosed() {
return isAlreadyClosed;
}

public IOException anyCleanupException() {
return null;
}
};
}

@Override
public boolean hasCapability(String capability) {
return capability == CommonPathCapabilities.ABORTABLE_STREAM;
}
}
}
Loading

0 comments on commit 480a8e6

Please sign in to comment.