Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/master' into SPARK-23092
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 1, 2018
2 parents 478ad17 + 032c11b commit 6389d80
Show file tree
Hide file tree
Showing 90 changed files with 814 additions and 264 deletions.
4 changes: 2 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2090,8 +2090,8 @@ setMethod("selectExpr",
#'
#' @param x a SparkDataFrame.
#' @param colName a column name.
#' @param col a Column expression (which must refer only to this DataFrame), or an atomic vector in
#' the length of 1 as literal value.
#' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
#' vector in the length of 1 as literal value.
#' @return A SparkDataFrame with the new column added or the existing column replaced.
#' @family SparkDataFrame functions
#' @aliases withColumn,SparkDataFrame,character-method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ private class DownloadCallback implements StreamCallback {

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
channel.write(buf);
while (buf.hasRemaining()) {
channel.write(buf);
}
}

@Override
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
for (i <- 0 until testOutputCopies) {
// Shift values by i so that they're different in the output
val alteredOutput = testOutput.map(b => (b + i).toByte)
channel.write(ByteBuffer.wrap(alteredOutput))
val buffer = ByteBuffer.wrap(alteredOutput)
while (buffer.hasRemaining) {
channel.write(buffer)
}
}
channel.close()
file.close()
Expand Down
21 changes: 19 additions & 2 deletions docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ and the migration guide below will explain all changes between releases.

### Breaking changes

There are no breaking changes.
* The class and trait hierarchy for logistic regression model summaries was changed to be cleaner
and better accommodate the addition of the multi-class summary. This is a breaking change for user
code that casts a `LogisticRegressionTrainingSummary` to a
` BinaryLogisticRegressionTrainingSummary`. Users should instead use the `model.binarySummary`
method. See [SPARK-17139](https://issues.apache.org/jira/browse/SPARK-17139) for more detail
(_note_ this is an `Experimental` API). This _does not_ affect the Python `summary` method, which
will still work correctly for both multinomial and binary cases.

### Deprecations and changes of behavior

Expand All @@ -123,8 +129,19 @@ new [`OneHotEncoderEstimator`](ml-features.html#onehotencoderestimator)
**Changes of behavior**

* [SPARK-21027](https://issues.apache.org/jira/browse/SPARK-21027):
We are now setting the default parallelism used in `OneVsRest` to be 1 (i.e. serial). In 2.2 and
The default parallelism used in `OneVsRest` is now set to 1 (i.e. serial). In `2.2` and
earlier versions, the level of parallelism was set to the default threadpool size in Scala.
* [SPARK-22156](https://issues.apache.org/jira/browse/SPARK-22156):
The learning rate update for `Word2Vec` was incorrect when `numIterations` was set greater than
`1`. This will cause training results to be different between `2.3` and earlier versions.
* [SPARK-21681](https://issues.apache.org/jira/browse/SPARK-21681):
Fixed an edge case bug in multinomial logistic regression that resulted in incorrect coefficients
when some features had zero variance.
* [SPARK-16957](https://issues.apache.org/jira/browse/SPARK-16957):
Tree algorithms now use mid-points for split values. This may change results from model training.
* [SPARK-14657](https://issues.apache.org/jira/browse/SPARK-14657):
Fixed an issue where the features generated by `RFormula` without an intercept were inconsistent
with the output in R. This may change results from model training in this scenario.

## Previous Spark versions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRo
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset}

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.types.StructType

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[feature] trait RFormulaBase extends HasFeaturesCol with HasLabelCol with
* @group param
*/
@Since("2.3.0")
final override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
"How to handle invalid data (unseen or NULL values) in features and label column of string " +
"type. Options are 'skip' (filter out rows with invalid data), error (throw an error), " +
"or 'keep' (put invalid data in a special additional bucket, at index numLabels).",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ class LinearRegressionModel private[ml] (
extends RegressionModel[Vector, LinearRegressionModel]
with LinearRegressionParams with MLWritable {

def this(uid: String, coefficients: Vector, intercept: Double) =
private[ml] def this(uid: String, coefficients: Vector, intercept: Double) =
this(uid, coefficients, intercept, 1.0)

private var trainingSummary: Option[LinearRegressionTrainingSummary] = None
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1829,7 +1829,7 @@ def withColumn(self, colName, col):
Returns a new :class:`DataFrame` by adding a column or replacing the
existing column that has the same name.
The column expression must be an expression over this dataframe; attempting to add
The column expression must be an expression over this DataFrame; attempting to add
a column from some other dataframe will raise an error.
:param colName: string, name of the new column.
Expand Down
10 changes: 9 additions & 1 deletion python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,12 @@ def __init__(self, sparkContext, jsparkSession=None):
self._jsc = self._sc._jsc
self._jvm = self._sc._jvm
if jsparkSession is None:
jsparkSession = self._jvm.SparkSession(self._jsc.sc())
if self._jvm.SparkSession.getDefaultSession().isDefined() \
and not self._jvm.SparkSession.getDefaultSession().get() \
.sparkContext().isStopped():
jsparkSession = self._jvm.SparkSession.getDefaultSession().get()
else:
jsparkSession = self._jvm.SparkSession(self._jsc.sc())
self._jsparkSession = jsparkSession
self._jwrapped = self._jsparkSession.sqlContext()
self._wrapped = SQLContext(self._sc, self, self._jwrapped)
Expand All @@ -225,6 +230,7 @@ def __init__(self, sparkContext, jsparkSession=None):
if SparkSession._instantiatedSession is None \
or SparkSession._instantiatedSession._sc._jsc is None:
SparkSession._instantiatedSession = self
self._jvm.SparkSession.setDefaultSession(self._jsparkSession)

def _repr_html_(self):
return """
Expand Down Expand Up @@ -759,6 +765,8 @@ def stop(self):
"""Stop the underlying :class:`SparkContext`.
"""
self._sc.stop()
# We should clean the default session up. See SPARK-23228.
self._jvm.SparkSession.clearDefaultSession()
SparkSession._instantiatedSession = None

@since(2.0)
Expand Down
34 changes: 30 additions & 4 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings
from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings
from pyspark.sql.types import _merge_type
from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests
from pyspark.tests import QuietTest, ReusedPySparkTestCase, PySparkTestCase, SparkSubmitTests
from pyspark.sql.functions import UserDefinedFunction, sha2, lit
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
Expand Down Expand Up @@ -2925,6 +2925,32 @@ def test_sparksession_with_stopped_sparkcontext(self):
sc.stop()


class SparkSessionTests(PySparkTestCase):

# This test is separate because it's closely related with session's start and stop.
# See SPARK-23228.
def test_set_jvm_default_session(self):
spark = SparkSession.builder.getOrCreate()
try:
self.assertTrue(spark._jvm.SparkSession.getDefaultSession().isDefined())
finally:
spark.stop()
self.assertTrue(spark._jvm.SparkSession.getDefaultSession().isEmpty())

def test_jvm_default_session_already_set(self):
# Here, we assume there is the default session already set in JVM.
jsession = self.sc._jvm.SparkSession(self.sc._jsc.sc())
self.sc._jvm.SparkSession.setDefaultSession(jsession)

spark = SparkSession.builder.getOrCreate()
try:
self.assertTrue(spark._jvm.SparkSession.getDefaultSession().isDefined())
# The session should be the same with the exiting one.
self.assertTrue(jsession.equals(spark._jvm.SparkSession.getDefaultSession().get()))
finally:
spark.stop()


class UDFInitializationTests(unittest.TestCase):
def tearDown(self):
if SparkSession._instantiatedSession is not None:
Expand Down Expand Up @@ -4504,19 +4530,19 @@ def test_unsupported_types(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType

with QuietTest(self.sc):
with self.assertRaisesRegex(NotImplementedError, 'not supported'):
with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUPED_AGG)
def mean_and_std_udf(v):
return [v.mean(), v.std()]

with QuietTest(self.sc):
with self.assertRaisesRegex(NotImplementedError, 'not supported'):
with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
@pandas_udf('mean double, std double', PandasUDFType.GROUPED_AGG)
def mean_and_std_udf(v):
return v.mean(), v.std()

with QuietTest(self.sc):
with self.assertRaisesRegex(NotImplementedError, 'not supported'):
with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
@pandas_udf(MapType(DoubleType(), DoubleType()), PandasUDFType.GROUPED_AGG)
def mean_and_std_udf(v):
return {v.mean(): v.std()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -718,7 +719,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
logError("User class threw exception: " + cause, cause)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
"User class threw exception: " + cause)
"User class threw exception: " + StringUtils.stringifyException(cause))
}
sparkContextPromise.tryFailure(e.getCause())
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ class Analyzer(
// to push down this ordering expression and can reference the original aggregate
// expression instead.
val needsPushDown = ArrayBuffer.empty[NamedExpression]
val evaluatedOrderings = resolvedAliasedOrdering.zip(sortOrder).map {
val evaluatedOrderings = resolvedAliasedOrdering.zip(unresolvedSortOrders).map {
case (evaluated, order) =>
val index = originalAggExprs.indexWhere {
case Alias(child, _) => child semanticEquals evaluated.child
Expand Down Expand Up @@ -2038,7 +2038,11 @@ class Analyzer(
WindowExpression(wf, s.copy(frameSpecification = wf.frame))
case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame))
if e.resolved =>
val frame = SpecifiedWindowFrame.defaultWindowFrame(o.nonEmpty, acceptWindowFrame = true)
val frame = if (o.nonEmpty) {
SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
} else {
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)
}
we.copy(windowSpec = s.copy(frameSpecification = frame))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,27 +265,6 @@ case class SpecifiedWindowFrame(
}
}

object SpecifiedWindowFrame {
/**
* @param hasOrderSpecification If the window spec has order by expressions.
* @param acceptWindowFrame If the window function accepts user-specified frame.
* @return the default window frame.
*/
def defaultWindowFrame(
hasOrderSpecification: Boolean,
acceptWindowFrame: Boolean): SpecifiedWindowFrame = {
if (hasOrderSpecification && acceptWindowFrame) {
// If order spec is defined and the window function supports user specified window frames,
// the default frame is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
} else {
// Otherwise, the default frame is
// ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)
}
}
}

case class UnresolvedWindowExpression(
child: Expression,
windowSpec: WindowSpecReference) extends UnaryExpression with Unevaluable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,27 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
}

plan.transform {
case Except(left, right) if isEligible(left, right) =>
Distinct(Filter(Not(transformCondition(left, skipProject(right))), left))
case e @ Except(left, right) if isEligible(left, right) =>
val newCondition = transformCondition(left, skipProject(right))
newCondition.map { c =>
Distinct(Filter(Not(c), left))
}.getOrElse {
e
}
}
}

private def transformCondition(left: LogicalPlan, right: LogicalPlan): Expression = {
private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = {
val filterCondition =
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition

val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap

filterCondition.transform { case a : AttributeReference => attributeNameMap(a.name) }
if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) {
Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) })
} else {
None
}
}

// TODO: This can be further extended in the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize")
.doc("The number of rows to include in a parquet vectorized reader batch. The number should " +
"be carefully chosen to minimize overhead and avoid OOMs in reading data.")
.intConf
.createWithDefault(4096)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
Expand All @@ -398,6 +404,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ORC_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.orc.columnarReaderBatchSize")
.doc("The number of rows to include in a orc vectorized reader batch. The number should " +
"be carefully chosen to minimize overhead and avoid OOMs in reading data.")
.intConf
.createWithDefault(4096)

val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
.doc("Whether or not to copy the ORC columnar batch to Spark columnar batch in the " +
"vectorized ORC reader.")
Expand Down Expand Up @@ -1250,10 +1262,14 @@ class SQLConf extends Serializable with Logging {

def orcVectorizedReaderEnabled: Boolean = getConf(ORC_VECTORIZED_READER_ENABLED)

def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE)

def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)

def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)

def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)

def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)

def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ class ReplaceOperatorSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("replace Except with Filter when only right filter can be applied to the left") {
val table = LocalRelation(Seq('a.int, 'b.int))
val left = table.where('b < 1).select('a).as("left")
val right = table.where('b < 3).select('a).as("right")

val query = Except(left, right)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Aggregate(left.output, right.output,
Join(left, right, LeftAnti, Option($"left.a" <=> $"right.a"))).analyze

comparePlans(optimized, correctAnswer)
}

test("replace Distinct with Aggregate") {
val input = LocalRelation('a.int, 'b.int)

Expand Down
Loading

0 comments on commit 6389d80

Please sign in to comment.