Skip to content

Conversation

@rajesh7738
Copy link

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

zsxwing and others added 30 commits March 3, 2017 19:09
…esn't recover the log level

## What changes were proposed in this pull request?

"DataFrameCallbackSuite.execute callback functions when a DataFrame action failed" sets the log level to "fatal" but doesn't recover it. Hence, tests running after it won't output any logs except fatal logs.

This PR uses `testQuietly` instead to avoid changing the log level.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17156 from zsxwing/SPARK-19816.

(cherry picked from commit fbc4058)
Signed-off-by: Yin Huai <yhuai@databricks.com>
… not filter checkpointFilesOfLatestTime with the PATH string.

## What changes were proposed in this pull request?

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/

```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code
passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds.
Last failure message: 8 did not equal 2.
	at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
	at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
	at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite
.scala:172)
	at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211)
```

the check condition is:

```
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
     _.toString.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that both
// are written to make sure that both of them have been written.
assert(checkpointFilesOfLatestTime.size === 2)
```

the path string may contain the `clock.getTimeMillis.toString`, like `3500` :

```
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500
                                                       ▲▲▲▲
```

so we should only check the filename, but not the whole path.

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17167 from uncleGen/flaky-CheckpointSuite.

(cherry picked from commit 207067e)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
…h queires

## What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka.
### Streaming Kafka Sink
- When addBatch is called
-- If batchId is great than the last written batch
--- Write batch to Kafka
---- Topic will be taken from the record, if present, or from a topic option, which overrides topic in record.
-- Else ignore

### Batch Kafka Sink
- KafkaSourceProvider will implement CreatableRelationProvider
- CreatableRelationProvider#createRelation will write the passed in Dataframe to a Kafka
- Topic will be taken from the record, if present, or from topic option, which overrides topic in record.
- Save modes Append and ErrorIfExist supported under identical semantics. Other save modes result in an AnalysisException

tdas zsxwing

## How was this patch tested?

### The following unit tests will be included
- write to stream with topic field: valid stream write with data that includes an existing topic in the schema
- write structured streaming aggregation w/o topic field, with default topic: valid stream write with data that does not include a topic field, but the configuration includes a default topic
- write data with bad schema: various cases of writing data that does not conform to a proper schema e.g., 1. no topic field or default topic, and 2. no value field
- write data with valid schema but wrong types: data with a complete schema but wrong types e.g., key and value types are integers.
- write to non-existing topic: write a stream to a topic that does not exist in Kafka, which has been configured to not auto-create topics.
- write batch to kafka: simple write batch to Kafka, which goes through the same code path as streaming scenario, so validity checks will not be redone here.

### Examples
```scala
// Structured Streaming
val writer = inputStringStream.map(s => s.get(0).toString.getBytes()).toDF("value")
 .selectExpr("value as key", "value as value")
 .writeStream
 .format("kafka")
 .option("checkpointLocation", checkpointDir)
 .outputMode(OutputMode.Append)
 .option("kafka.bootstrap.servers", brokerAddress)
 .option("topic", topic)
 .queryName("kafkaStream")
 .start()

// Batch
val df = spark
 .sparkContext
 .parallelize(Seq("1", "2", "3", "4", "5"))
 .map(v => (topic, v))
 .toDF("topic", "value")

df.write
 .format("kafka")
 .option("kafka.bootstrap.servers",brokerAddress)
 .option("topic", topic)
 .save()
```
Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>

Closes #17043 from tcondie/kafka-writer.
## What changes were proposed in this pull request?

Cast the output of `TimestampType.toInternal` to long to allow for proper Timestamp creation in DataFrames near the epoch.

## How was this patch tested?

Added a new test that fails without the change.

dongjoon-hyun davies Mind taking a look?

The contribution is my original work and I license the work to the project under the project’s open source license.

Author: Jason White <jason.white@shopify.com>

Closes #16896 from JasonMWhite/SPARK-19561.

(cherry picked from commit 6f46846)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
Add parentheses so that both lines form a single statement; also add
a log message so that the issue becomes more explicit if it shows up
again.

Tested manually with integration test that exercises the feature.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #17198 from vanzin/SPARK-19857.

(cherry picked from commit 8e41c2e)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?

The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected.

## How was this patch tested?

The new test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17199 from zsxwing/SPARK-19859.

(cherry picked from commit d8830c5)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request?
The `keyword_only` decorator in PySpark is not thread-safe.  It writes kwargs to a static class variable in the decorator, which is then retrieved later in the class method as `_input_kwargs`.  If multiple threads are constructing the same class with different kwargs, it becomes a race condition to read from the static class variable before it's overwritten.  See [SPARK-19348](https://issues.apache.org/jira/browse/SPARK-19348) for reproduction code.

This change will write the kwargs to a member variable so that multiple threads can operate on separate instances without the race condition.  It does not protect against multiple threads operating on a single instance, but that is better left to the user to synchronize.

## How was this patch tested?
Added new unit tests for using the keyword_only decorator and a regression test that verifies `_input_kwargs` can be overwritten from different class instances.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #17193 from BryanCutler/pyspark-keyword_only-threadsafe-SPARK-19348-2_1.
Previously, we were using the mirror of passed in `TypeTag` when reflecting to build an encoder.  This fails when the outer class is built in (i.e. `Seq`'s default mirror is based on root classloader) but inner classes (i.e. `A` in `Seq[A]`) are defined in the REPL or a library.

This patch changes us to always reflect based on a mirror created using the context classloader.

Author: Michael Armbrust <michael@databricks.com>

Closes #17201 from marmbrus/replSeqEncoder.

(cherry picked from commit 314e48a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… in combination with maxFileAge in FileStreamSource

## What changes were proposed in this pull request?

**The Problem**
There is a file stream source option called maxFileAge which limits how old the files can be, relative the latest file that has been seen. This is used to limit the files that need to be remembered as "processed". Files older than the latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be any unprocessed file older than the latest processed file. All files will be processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is not, then all old files get processed in the first batch, and so no file is left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch process the latest X files. That sets the threshold latest file - maxFileAge, so files older than this threshold will never be considered for processing.
The bug is with case 3.

**The Solution**

Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.

## How was this patch tested?

Regression test in `FileStreamSourceSuite`

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17153 from brkyvz/maxFileAge.

(cherry picked from commit a3648b5)
Signed-off-by: Burak Yavuz <brkyvz@gmail.com>
…isException is raised from analyzer.

## What changes were proposed in this pull request?
In general we have a checkAnalysis phase which validates the logical plan and throws AnalysisException on semantic errors. However we also can throw AnalysisException from a few analyzer rules like ResolveSubquery.

I found that we fire up the analyzer rules twice for the queries that throw AnalysisException from one of the analyzer rules. This is a very minor fix. We don't have to strictly fix it. I just got confused seeing the rule getting fired two times when i was not expecting it.

## How was this patch tested?

Tested manually.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #17214 from dilipbiswal/analyis_twice.

(cherry picked from commit d809cee)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
## What changes were proposed in this pull request?

The API docs should not include the "org.apache.spark.sql.internal" package because they are internal private APIs.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17217 from zsxwing/SPARK-19874.

(cherry picked from commit 029e40b)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
…d one.

## What changes were proposed in this pull request?

A follow up to SPARK-19859:

- extract the calculation of `delayMs` and reuse it.
- update EventTimeWatermarkExec
- use the correct `delayMs` in EventTimeWatermark

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17221 from uncleGen/SPARK-19859.

(cherry picked from commit eeb1d6d)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request?

Add handling of input of type `Int` for dataType `TimestampType` to `EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger than MAX_INT to Long, which are handled correctly already, but values between MIN_INT and MAX_INT are serialized to Int.

These range limits correspond to roughly half an hour on either side of the epoch. As a result, PySpark doesn't allow TimestampType values to be created in this range.

Alternatives attempted: patching the `TimestampType.toInternal` function to cast return values to `long`, so Py4J would always serialize them to Scala Long. Python3 does not have a `long` type, so this approach failed on Python3.

## How was this patch tested?

Added a new PySpark-side test that fails without the change.

The contribution is my original work and I license the work to the project under the project’s open source license.

Resubmission of #16896. The original PR didn't go through Jenkins and broke the build. davies dongjoon-hyun

cloud-fan Could you kick off a Jenkins run for me? It passed everything for me locally, but it's possible something has changed in the last few weeks.

Author: Jason White <jason.white@shopify.com>

Closes #17200 from JasonMWhite/SPARK-19561.

(cherry picked from commit 206030b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

`watermark` should not be negative. This behavior is invalid, check it before real run.

## How was this patch tested?

add new unit test.

Author: uncleGen <hustyugm@gmail.com>
Author: dylon <hustyugm@gmail.com>

Closes #17202 from uncleGen/SPARK-19861.

(cherry picked from commit 30b18e6)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request?

Fix the `throw new IllegalStateException` if statement part.

## How is this patch tested

Regression test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17228 from brkyvz/kafka-cause-fix.

(cherry picked from commit 82138e0)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request?

We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown.

## How was this patch tested?

Current tests that throw exceptions at runtime will finish faster as a result of this update.

zsxwing

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>

Closes #17231 from tcondie/kafka-writer.

(cherry picked from commit 501b711)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
In spark SQL, map type can't be used in equality test/comparison, and `Intersect`/`Except`/`Distinct` do need equality test for all columns, we should not allow map type in `Intersect`/`Except`/`Distinct`.

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17236 from cloud-fan/map.

(cherry picked from commit fb9beda)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Add a new configuration option that allows Spark SQL to infer a case-sensitive schema from a Hive Metastore table's data files when a case-sensitive schema can't be read from the table properties.

- Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
- Add schemaPreservesCase field to CatalogTable (set to false when schema can't
  successfully be read from Hive table props)
- Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
  false, depending on spark.sql.hive.caseSensitiveInferenceMode
- Add alterTableSchema() method to the ExternalCatalog interface
- Add HiveSchemaInferenceSuite tests
- Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
  HiveMetastoreCatalog.mergeWithMetastoreSchema
- Move schema merging tests from ParquetSchemaSuite to HiveSchemaInferenceSuite

[JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611)

The tests in ```HiveSchemaInferenceSuite``` should verify that schema inference is working as expected. ```ExternalCatalogSuite``` has also been extended to cover the new ```alterTableSchema()``` API.

Author: Budde <budde@amazon.com>

Closes #17229 from budde/SPARK-19611-2.1.
## What changes were proposed in this pull request?

- SS python example: `TypeError: 'xxx' object is not callable`
- some other doc issue.

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17257 from uncleGen/docs-ss-python.

(cherry picked from commit e29a74d)
Signed-off-by: Sean Owen <sowen@cloudera.com>
…e SpecificOffsets

When using the KafkaSource with Structured Streaming, consumer assignments are not what the user expects if startingOffsets is set to an explicit set of topics/partitions in JSON where the topic(s) happen to have uppercase characters. When StartingOffsets is constructed, the original string value from options is transformed toLowerCase to make matching on "earliest" and "latest" case insensitive. However, the toLowerCase JSON is passed to SpecificOffsets for the terminal condition, so topic names may not be what the user intended by the time assignments are made with the underlying KafkaConsumer.

KafkaSourceProvider.scala:
```
val startingOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match {
    case Some("latest") => LatestOffsets
    case Some("earliest") => EarliestOffsets
    case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json))
    case None => LatestOffsets
  }
```

Thank cbowden for reporting.

Jenkins

Author: uncleGen <hustyugm@gmail.com>

Closes #17209 from uncleGen/SPARK-19853.

(cherry picked from commit 0a4d06a)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
## What changes were proposed in this pull request?
The `RemoveRedundantAlias` rule can change the output attributes (the expression id's to be precise) of a query by eliminating the redundant alias producing them. This is no problem for a regular query, but can cause problems for correlated subqueries: The attributes produced by the subquery are used in the parent plan; changing them will break the parent plan.

This PR fixes this by wrapping a subquery in a `Subquery` top level node when it gets optimized. The `RemoveRedundantAlias` rule now recognizes `Subquery` and makes sure that the output attributes of the `Subquery` node are retained.

## How was this patch tested?
Added a test case to `RemoveRedundantAliasAndProjectSuite` and added a regression test to `SubquerySuite`.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17278 from hvanhovell/SPARK-19933.

(cherry picked from commit e04c05c)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null.

This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17277 from cloud-fan/partition.

(cherry picked from commit dacc382)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…-2.1)

## What changes were proposed in this pull request?
This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf).

Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf.

This is a backport into branch-2.1 to minimize merge conflicts.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #17301 from rxin/branch-2.1-conf.
…ion for coalesce/repartition

## What changes were proposed in this pull request?

This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer` as is not `BatchedSerializer` from the copied one.

with the file, `text.txt` below:

```
a
b

d
e
f
g
h
i
j
k
l

```

- Before

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
UTF8Deserializer(True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/rdd.py", line 811, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File ".../spark/python/pyspark/serializers.py", line 549, in load_stream
    yield self.loads(stream)
  File ".../spark/python/pyspark/serializers.py", line 544, in loads
    return s.decode("utf-8") if self.use_unicode else s
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
```

- After

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
[u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u'']
```

## How was this patch tested?

Unit test in `python/pyspark/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17282 from HyukjinKwon/SPARK-19872.

(cherry picked from commit 7387126)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
…e table with a non pre-existing location should succeed

## What changes were proposed in this pull request?

This is a backport pr of #16672 into branch-2.1.

## How was this patch tested?
Existing tests.

Author: windpiger <songjun@outlook.com>

Closes #17317 from windpiger/backport-insertnotexists.
…L] Backport Three Cache-related PRs to Spark 2.1

### What changes were proposed in this pull request?

Backport a few cache related PRs:

---
[[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression](#16493)

Consider the plans inside subquery expressions while looking up cache manager to make
use of cached data. Currently CacheManager.useCachedData does not consider the
subquery expressions in the plan.

---
[[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path](#17064)

Catalog.refreshByPath can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.

However, CacheManager.invalidateCachedPath doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.

---
[[SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached plans that refer to this table](#17097)

When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. The following commands trigger the table uncache: `DropTableCommand`, `TruncateTableCommand`, `AlterTableRenameCommand`, `UncacheTableCommand`, `RefreshTable` and `InsertIntoHiveTable`

This PR also includes some refactors:
- use java.util.LinkedList to store the cache entries, so that it's safer to remove elements while iterating
- rename invalidateCache to recacheByPlan, which is more obvious about what it does.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17319 from gatorsmile/backport-17097.
… in log files

## Problem

There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message.

## What changes were proposed in this pull request?

This patch made two major changes:
1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking):
```
HDFSMetadataLog
  - CompactibleFileStreamLog  ------------> fixed with this patch
    - FileStreamSourceLog  ---------------> inherited the fix of `CompactibleFileStreamLog`
    - FileStreamSinkLog  -----------------> inherited the fix of `CompactibleFileStreamLog`
  - OffsetSeqLog  ------------------------> fixed with this patch
  - anonymous subclass in KafkaSource  ---> fixed with this patch
```

2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"`
    - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"`

## Exception message with this patch
```
java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trw0000gn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade.
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```

## How was this patch tested?

unit tests

Author: Liwei Lin <lwlin7@gmail.com>

Closes #17327 from lw-lin/good-msg-2.1.
…e stable

## What changes were proposed in this pull request?

Sometimes, CheckpointTests will hang on a busy machine because the streaming jobs are too slow and cannot catch up. I observed the scheduled delay was keeping increasing for dozens of seconds locally.

This PR increases the batch interval from 0.5 seconds to 2 seconds to generate less Spark jobs. It should make `pyspark.streaming.tests.CheckpointTests` more stable. I also replaced `sleep` with `awaitTerminationOrTimeout` so that if the streaming job fails, it will also fail the test.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17323 from zsxwing/SPARK-19986.

(cherry picked from commit 376d782)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
felixcheung and others added 26 commits November 7, 2017 20:57
## What changes were proposed in this pull request?

Will need to port to this to branch-~~1.6~~, -2.0, -2.1, -2.2

## How was this patch tested?

manually
Jenkins, AppVeyor

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #19620 from felixcheung/rcranversioncheck21.
…lease-build.sh

## What changes were proposed in this pull request?

This PR proposes to use `/usr/sbin/lsof` if `lsof` is missing in the path to fix nightly snapshot jenkins jobs. Please refer #19359 (comment):

> Looks like some of the snapshot builds are having lsof issues:
>
> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-branch-2.1-maven-snapshots/182/console
>
>https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-branch-2.2-maven-snapshots/134/console
>
>spark-build/dev/create-release/release-build.sh: line 344: lsof: command not found
>usage: kill [ -s signal | -p ] [ -a ] pid ...
>kill -l [ signal ]

Up to my knowledge,  the full path of `lsof` is required for non-root user in few OSs.

## How was this patch tested?

Manually tested as below:

```bash
#!/usr/bin/env bash

LSOF=lsof
if ! hash $LSOF 2>/dev/null; then
  echo "a"
  LSOF=/usr/sbin/lsof
fi

$LSOF -P | grep "a"
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19695 from HyukjinKwon/SPARK-22377.

(cherry picked from commit c8b7f97)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
…C data source

## What changes were proposed in this pull request?

Let’s say I have a nested AND expression shown below and p2 can not be pushed down,

(p1 AND p2) OR p3

In current Spark code, during data source filter translation, (p1 AND p2) is returned as p1 only and p2 is simply lost. This issue occurs with JDBC data source and is similar to [SPARK-12218](#10362) for Parquet. When we have AND nested below another expression, we should either push both legs or nothing.

Note that:
- The current Spark code will always split conjunctive predicate before it determines if a predicate can be pushed down or not
- If I have (p1 AND p2) AND p3, it will be split into p1, p2, p3. There won't be nested AND expression.
- The current Spark code logic for OR is OK. It either pushes both legs or nothing.

The same translation method is also called by Data Source V2.

## How was this patch tested?

Added new unit test cases to JDBCSuite

gatorsmile

Author: Jia Li <jiali@us.ibm.com>

Closes #19776 from jliwork/spark-22548.

(cherry picked from commit 881c5c8)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
… with Janino when compiling generated code.

## What changes were proposed in this pull request?

Bump up Janino dependency version to fix thread safety issue during compiling generated code

## How was this patch tested?

Check https://issues.apache.org/jira/browse/SPARK-22373 for details.
Converted part of the code in CodeGenerator into a standalone application, so the issue can be consistently reproduced locally.
Verified that changing Janino dependency version resolved this issue.

Author: Min Shen <mshen@linkedin.com>

Closes #19839 from Victsm/SPARK-22373.

(cherry picked from commit 7da1f57)
Signed-off-by: Sean Owen <sowen@cloudera.com>
…ists too for Janino

## What changes were proposed in this pull request?

This PR updates other missed lists in branch-2.1 for dependency update of Janino.

## How was this patch tested?

Jenkins tests

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19866 from HyukjinKwon/branch-2.1-SPARK-22373.
This PR upgrade Janino version to 3.0.8. [Janino 3.0.8](https://janino-compiler.github.io/janino/changelog.html) includes an important fix to reduce the number of constant pool entries by using 'sipush' java bytecode.

* SIPUSH bytecode is not used for short integer constant [#33](janino-compiler/janino#33).

Please see detail in [this discussion thread](#19518 (comment)).

Existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19890 from kiszk/SPARK-22688.

(cherry picked from commit 8ae004b)
Signed-off-by: Sean Owen <sowen@cloudera.com>
…oder

This behavior has confused some users, so lets clarify it.

Author: Michael Armbrust <michael@databricks.com>

Closes #20048 from marmbrus/datasetAsDocs.

(cherry picked from commit 8df1da3)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…itions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?
This PR backports [#20244](#20244)

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L932), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

## How was this patch tested?
the exist tests.

Author: huangtengfei <huangtengfei@huangtengfeideMacBook-Pro.local>

Closes #20635 from ivoson/branch-2.1-23053.
…rashes

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will prevent data loss.

Additional unit test + manually.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20620 from gaborgsomogyi/SPARK-23438.

(cherry picked from commit b308182)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…rectly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21229 from cloud-fan/accumulator.

(cherry picked from commit 4d5de4d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This PR aims to bump Py4J in order to fix the following float/double bug.
Py4J 0.10.5 fixes this (py4j/py4j#272) and the latest Py4J is 0.10.6.

**BEFORE**
```
>>> df = spark.range(1)
>>> df.select(df['id'] + 17.133574204226083).show()
+--------------------+
|(id + 17.1335742042)|
+--------------------+
|       17.1335742042|
+--------------------+
```

**AFTER**
```
>>> df = spark.range(1)
>>> df.select(df['id'] + 17.133574204226083).show()
+-------------------------+
|(id + 17.133574204226083)|
+-------------------------+
|       17.133574204226083|
+-------------------------+
```

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18546 from dongjoon-hyun/SPARK-21278.

(cherry picked from commit c8d0aba)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit cc613b5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 323dc3a)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 850b7d8)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 628c7b5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 16cd9ac)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit f96d13d)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?

backport part of the commit that addresses lintr issue

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #21325 from felixcheung/rlintfix22.

(cherry picked from commit 8c223b6)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
…rong

LongToUnsafeRowMap has a mistake when growing its page array: it blindly grows to `oldSize * 2`, while the new record may be larger than `oldSize * 2`. Then we may have a malformed UnsafeRow when querying this map, whose actual data is smaller than its declared size, and the data is corrupted.

Author: sychen <sychen@ctrip.com>

Closes #21311 from cxzl25/fix_LongToUnsafeRowMap_page_size.

(cherry picked from commit 8883401)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
As discussed separately, this avoids the possibility of XSS on certain request param keys.

CC vanzin

Author: Sean Owen <srowen@gmail.com>

Closes #21464 from srowen/XSS2.

(cherry picked from commit 698b9a0)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Apply the suggestion on the bug to fix source links. Tested with
the 2.3.1 release docs.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21521 from vanzin/SPARK-23732.

(cherry picked from commit dc22465)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
stageAttemptId added in TaskContext and corresponding construction modification

Added a new test in TaskContextSuite, two cases are tested:
1. Normal case without failure
2. Exception case with resubmitted stages

Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897)

Author: Xianjin YE <advancedxy@gmail.com>

Closes #20082 from advancedxy/SPARK-22897.

(cherry picked from commit a6fc300)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…ator [branch-2.1].

When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21577 from vanzin/SPARK-24552.

(cherry picked from commit c8e909c)
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 751b008)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

@rajesh7738 seems mistakenly open. mind closing this please?

@rajesh7738 rajesh7738 closed this Jul 14, 2018
@rajesh7738
Copy link
Author

yes..closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.