Skip to content

Commit

Permalink
[SPARK-18368] Fix regexp_replace with task serialization.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This makes the result value both transient and lazy, so that if the RegExpReplace object is initialized then serialized, `result: StringBuffer` will be correctly initialized.

## How was this patch tested?

* Verified that this patch fixed the query that found the bug.
* Added a test case that fails without the fix.

Author: Ryan Blue <blue@apache.org>

Closes #15816 from rdblue/SPARK-18368-fix-regexp-replace.
  • Loading branch information
rdblue authored and rxin committed Nov 9, 2016
1 parent 4afa39e commit b9192bb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Expand Up @@ -230,7 +230,7 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio
@transient private var lastReplacement: String = _
@transient private var lastReplacementInUTF8: UTF8String = _
// result buffer write by Matcher
@transient private val result: StringBuffer = new StringBuffer
@transient private lazy val result: StringBuffer = new StringBuffer

override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
if (!p.equals(lastRegex)) {
Expand Down
Expand Up @@ -22,7 +22,8 @@ import org.scalactic.TripleEqualsSupport.Spread
import org.scalatest.exceptions.TestFailedException
import org.scalatest.prop.GeneratorDrivenPropertyChecks

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
Expand All @@ -43,13 +44,15 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {

protected def checkEvaluation(
expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
val serializer = new JavaSerializer(new SparkConf()).newInstance
val expr: Expression = serializer.deserialize(serializer.serialize(expression))
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow)
if (GenerateUnsafeProjection.canSupport(expression.dataType)) {
checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow)
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
checkEvalutionWithUnsafeProjection(expr, catalystValue, inputRow)
}
checkEvaluationWithOptimization(expression, catalystValue, inputRow)
checkEvaluationWithOptimization(expr, catalystValue, inputRow)
}

/**
Expand Down

0 comments on commit b9192bb

Please sign in to comment.