Skip to content

Branch 2.3#22242

Closed
ArunkumarRamanan wants to merge 584 commits intomasterfrom
branch-2.3
Closed

Branch 2.3#22242
ArunkumarRamanan wants to merge 584 commits intomasterfrom
branch-2.3

Conversation

@ArunkumarRamanan
Copy link

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Marcelo Vanzin and others added 30 commits March 7, 2018 17:06
…uncher test.

First the bad news: there's an unfixable race in the launcher code.
(By unfixable I mean it would take a lot more effort than this change
to fix it.) The good news is that it should only affect super short
lived applications, such as the one run by the flaky test, so it's
possible to work around it in our test.

The fix also uncovered an issue with the recently added "closeAndWait()"
method; closing the connection would still possibly cause data loss,
so this change waits a while for the connection to finish itself, and
closes the socket if that times out. The existing connection timeout
is reused so that if desired it's possible to control how long to wait.

As part of that I also restored the old behavior that disconnect() would
force a disconnection from the child app; the "wait for data to arrive"
approach is only taken when disposing of the handle.

I tested this by inserting a bunch of sleeps in the test and the socket
handling code in the launcher library; with those I was able to reproduce
the error from the jenkins jobs. With the changes, even with all the
sleeps still in place, all tests pass.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20743 from vanzin/SPARK-23020.
…uption.

## What changes were proposed in this pull request?

In current code, all local blocks will be checked for corruption no matter it's big or not.  The reasons are as below:

Size in FetchResult for local block is set to be 0 (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small blocks(size<maxBytesInFlight/3), but for reason 1, below check will be invalid. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420

We can fix this and avoid the OOM.

## How was this patch tested?

UT added

Author: jx158167 <jx158167@antfin.com>

Closes #20685 from jinxing64/SPARK-23524.

(cherry picked from commit 77c91cc)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…table in CreateTable

Backport #20660 to branch 2.3
=====================================
## What changes were proposed in this pull request?

For CreateTable with Append mode, we should check if `storage.locationUri` is the same with existing table in `PreprocessTableCreation`

In the current code, there is only a simple exception if the `storage.locationUri` is different with existing table:
`org.apache.spark.sql.AnalysisException: Table or view not found:`

which can be improved.

## How was this patch tested?

Unit test

Author: Wang Gengliang <gengliang.wang@databricks.com>

Closes #20766 from gengliangwang/backport_20660_to_2.3.
This is a backport of #20598.

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations.
```
val df = input.toDF
val join =
      df.select('value % 5 as "key", 'value).join(
        df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- StreamingExecutionRelation Memory[#1], value#12  // two different leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]       // both value#1 and value#12 replaces by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
      +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, incorrect join results
      +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   :     +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- Project [value#66 AS value#12]    // solution: project with aliases
         +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20765 from tdas/SPARK-23406-2.3.
…an be casted to Date

This PR is to backport #20621 to branch 2.3�

---
## What changes were proposed in this pull request?

Before the patch, Spark could infer as Date a partition value which cannot be casted to Date (this can happen when there are extra characters after a valid date, like `2018-02-15AAA`).

When this happens and the input format has metadata which define the schema of the table, then `null` is returned as a value for the partition column, because the `cast` operator used in (`PartitioningAwareFileIndex.inferPartitioning`) is unable to convert the value.

The PR checks in the partition inference that values can be casted to Date and Timestamp, in order to infer that datatype to them.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20764 from gatorsmile/backport23436.
…ffect.

This change restores functionality that was inadvertently removed as part
of the fix for SPARK-22372.

Also modified an existing unit test to make sure the feature works as intended.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20776 from vanzin/SPARK-23630.

(cherry picked from commit 2c36736)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…rn 1 + num of expressions

## What changes were proposed in this pull request?

Backport of ea48099 to branch 2.3.

## How was this patch tested?

added UT

cc cloud-fan hvanhovell

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20783 from mgaido91/SPARK-23628_2.3.
…data from JSON

## What changes were proposed in this pull request?

The from_json() function accepts an additional parameter, where the user might specify the schema. The issue is that the specified schema might not be compatible with data. In particular, the JSON data might be missing data for fields declared as non-nullable in the schema. The from_json() function does not verify the data against such errors. When data with missing fields is sent to the parquet encoder, there is no verification either. The end results is a corrupt parquet file.

To avoid corruptions, make sure that all fields in the user-specified schema are set to be nullable.
Since this changes the behavior of a public function, we need to include it in release notes.
The behavior can be reverted by setting `spark.sql.fromJsonForceNullableSchema=false`

## How was this patch tested?

Added two new tests.

Author: Michał Świtakowski <michal.switakowski@databricks.com>

Closes #20694 from mswit-databricks/SPARK-23173.

(cherry picked from commit 2ca9bb0)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?

Revise doc of method pushFilters in SupportsPushDownFilters/SupportsPushDownCatalystFilters

In `FileSourceStrategy`, except `partitionKeyFilters`(the references of which is subset of partition keys), all filters needs to be evaluated after scanning. Otherwise, Spark will get wrong result from data sources like Orc/Parquet.

This PR is to improve the doc.

Author: Wang Gengliang <gengliang.wang@databricks.com>

Closes #20769 from gengliangwang/revise_pushdown_doc.

(cherry picked from commit 10b0657)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: DylanGuedes <djmgguedesgmail.com>

## What changes were proposed in this pull request?

Changes variable name conflict: [input is a built-in python function](https://stackoverflow.com/questions/20670732/is-input-a-keyword-in-python).

## How was this patch tested?

I runned the example and it works fine.

Author: DylanGuedes <djmgguedes@gmail.com>

Closes #20775 from DylanGuedes/input_variable.

(cherry picked from commit b6f837c)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
## What changes were proposed in this pull request?

The error message ```s"""Field "$name" does not exist."""``` is thrown when looking up an unknown field in StructType. In the error message, we should also contain the information about which columns/fields exist in this struct.

## How was this patch tested?

Added new unit tests.

Note: I created a new `StructTypeSuite.scala` as I couldn't find an existing suite that's suitable to place these tests. I may be missing something so feel free to propose new locations.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xiayun Sun <xiayunsun@gmail.com>

Closes #20649 from xysun/SPARK-23462.

(cherry picked from commit b304e07)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
…he rule OptimizeMetadataOnlyQuery

This PR is to backport #20684 and #20693 to Spark 2.3 branch

---

## What changes were proposed in this pull request?
```Scala
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
 Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
 .write.json(tablePath.getCanonicalPath)
 val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
 df.show()
```

It generates a wrong result.
```
[c,e,a]
```

We have a bug in the rule `OptimizeMetadataOnlyQuery `. We should respect the attribute order in the original leaf node. This PR is to fix it.

## How was this patch tested?
Added a test case

Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Author: gatorsmile <gatorsmile@gmail.com>

Closes #20763 from gatorsmile/backport23523.
…d runtime error for a large query

## What changes were proposed in this pull request?

This PR fixes runtime error regarding a large query when a generated code has split classes. The issue is `append()`, `stopEarly()`, and other methods are not accessible from split classes that are not subclasses of `BufferedRowIterator`.
This PR fixes this issue by making them `public`.

Before applying the PR, we see the following exception by running the attached program with `CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD=-1`.
```
  test("SPARK-23598") {
    // When set -1 to CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD, an exception is thrown
    val df_pet_age = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name")
    df_pet_age.groupBy("name").avg("age").show()
  }
```

Exception:
```
19:40:52.591 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19:41:32.319 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.shouldStop()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(generated.java:203)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:160)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
...
```

Generated code (line 195 calles `stopEarly()`).
```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg;
/* 010 */   private boolean agg_bufIsNull;
/* 011 */   private double agg_bufValue;
/* 012 */   private boolean agg_bufIsNull1;
/* 013 */   private long agg_bufValue1;
/* 014 */   private agg_FastHashMap agg_fastHashMap;
/* 015 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter;
/* 016 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 017 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 018 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 019 */   private scala.collection.Iterator inputadapter_input;
/* 020 */   private boolean agg_agg_isNull11;
/* 021 */   private boolean agg_agg_isNull25;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2];
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 024 */   private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2];
/* 025 */
/* 026 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */     partitionIndex = index;
/* 032 */     this.inputs = inputs;
/* 033 */
/* 034 */     agg_fastHashMap = new agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 035 */     agg_hashMap = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 036 */     inputadapter_input = inputs[0];
/* 037 */     agg_mutableStateArray[0] = new UnsafeRow(1);
/* 038 */     agg_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0], 32);
/* 039 */     agg_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0], 1);
/* 040 */     agg_mutableStateArray[1] = new UnsafeRow(3);
/* 041 */     agg_mutableStateArray1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1], 32);
/* 042 */     agg_mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1], 3);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   public class agg_FastHashMap {
/* 047 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 048 */     private int[] buckets;
/* 049 */     private int capacity = 1 << 16;
/* 050 */     private double loadFactor = 0.5;
/* 051 */     private int numBuckets = (int) (capacity / loadFactor);
/* 052 */     private int maxSteps = 2;
/* 053 */     private int numRows = 0;
/* 054 */     private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1] /* keyName */), org.apache.spark.sql.types.DataTypes.StringType);
/* 055 */     private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2] /* keyName */), org.apache.spark.sql.types.DataTypes.DoubleType)
/* 056 */     .add(((java.lang.String) references[3] /* keyName */), org.apache.spark.sql.types.DataTypes.LongType);
/* 057 */     private Object emptyVBase;
/* 058 */     private long emptyVOff;
/* 059 */     private int emptyVLen;
/* 060 */     private boolean isBatchFull = false;
/* 061 */
/* 062 */     public agg_FastHashMap(
/* 063 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 064 */       InternalRow emptyAggregationBuffer) {
/* 065 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 066 */       .allocate(keySchema, valueSchema, taskMemoryManager, capacity);
/* 067 */
/* 068 */       final UnsafeProjection valueProjection = UnsafeProjection.create(valueSchema);
/* 069 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 070 */
/* 071 */       emptyVBase = emptyBuffer;
/* 072 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 073 */       emptyVLen = emptyBuffer.length;
/* 074 */
/* 075 */       buckets = new int[numBuckets];
/* 076 */       java.util.Arrays.fill(buckets, -1);
/* 077 */     }
/* 078 */
/* 079 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(UTF8String agg_key) {
/* 080 */       long h = hash(agg_key);
/* 081 */       int step = 0;
/* 082 */       int idx = (int) h & (numBuckets - 1);
/* 083 */       while (step < maxSteps) {
/* 084 */         // Return bucket index if it's either an empty slot or already contains the key
/* 085 */         if (buckets[idx] == -1) {
/* 086 */           if (numRows < capacity && !isBatchFull) {
/* 087 */             // creating the unsafe for new entry
/* 088 */             UnsafeRow agg_result = new UnsafeRow(1);
/* 089 */             org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder
/* 090 */             = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
/* 091 */               32);
/* 092 */             org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
/* 093 */             = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 094 */               agg_holder,
/* 095 */               1);
/* 096 */             agg_holder.reset(); //TODO: investigate if reset or zeroout are actually needed
/* 097 */             agg_rowWriter.zeroOutNullBytes();
/* 098 */             agg_rowWriter.write(0, agg_key);
/* 099 */             agg_result.setTotalSize(agg_holder.totalSize());
/* 100 */             Object kbase = agg_result.getBaseObject();
/* 101 */             long koff = agg_result.getBaseOffset();
/* 102 */             int klen = agg_result.getSizeInBytes();
/* 103 */
/* 104 */             UnsafeRow vRow
/* 105 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 106 */             if (vRow == null) {
/* 107 */               isBatchFull = true;
/* 108 */             } else {
/* 109 */               buckets[idx] = numRows++;
/* 110 */             }
/* 111 */             return vRow;
/* 112 */           } else {
/* 113 */             // No more space
/* 114 */             return null;
/* 115 */           }
/* 116 */         } else if (equals(idx, agg_key)) {
/* 117 */           return batch.getValueRow(buckets[idx]);
/* 118 */         }
/* 119 */         idx = (idx + 1) & (numBuckets - 1);
/* 120 */         step++;
/* 121 */       }
/* 122 */       // Didn't find it
/* 123 */       return null;
/* 124 */     }
/* 125 */
/* 126 */     private boolean equals(int idx, UTF8String agg_key) {
/* 127 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 128 */       return (row.getUTF8String(0).equals(agg_key));
/* 129 */     }
/* 130 */
/* 131 */     private long hash(UTF8String agg_key) {
/* 132 */       long agg_hash = 0;
/* 133 */
/* 134 */       int agg_result = 0;
/* 135 */       byte[] agg_bytes = agg_key.getBytes();
/* 136 */       for (int i = 0; i < agg_bytes.length; i++) {
/* 137 */         int agg_hash1 = agg_bytes[i];
/* 138 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + (agg_result << 6) + (agg_result >>> 2);
/* 139 */       }
/* 140 */
/* 141 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash << 6) + (agg_hash >>> 2);
/* 142 */
/* 143 */       return agg_hash;
/* 144 */     }
/* 145 */
/* 146 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 147 */       return batch.rowIterator();
/* 148 */     }
/* 149 */
/* 150 */     public void close() {
/* 151 */       batch.close();
/* 152 */     }
/* 153 */
/* 154 */   }
/* 155 */
/* 156 */   protected void processNext() throws java.io.IOException {
/* 157 */     if (!agg_initAgg) {
/* 158 */       agg_initAgg = true;
/* 159 */       long wholestagecodegen_beforeAgg = System.nanoTime();
/* 160 */       agg_nestedClassInstance1.agg_doAggregateWithKeys();
/* 161 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg) / 1000000);
/* 162 */     }
/* 163 */
/* 164 */     // output the result
/* 165 */
/* 166 */     while (agg_fastHashMapIter.next()) {
/* 167 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_fastHashMapIter.getKey();
/* 168 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_fastHashMapIter.getValue();
/* 169 */       wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, agg_aggBuffer);
/* 170 */
/* 171 */       if (shouldStop()) return;
/* 172 */     }
/* 173 */     agg_fastHashMap.close();
/* 174 */
/* 175 */     while (agg_mapIter.next()) {
/* 176 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 177 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 178 */       wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, agg_aggBuffer);
/* 179 */
/* 180 */       if (shouldStop()) return;
/* 181 */     }
/* 182 */
/* 183 */     agg_mapIter.close();
/* 184 */     if (agg_sorter == null) {
/* 185 */       agg_hashMap.free();
/* 186 */     }
/* 187 */   }
/* 188 */
/* 189 */   private wholestagecodegen_NestedClass wholestagecodegen_nestedClassInstance = new wholestagecodegen_NestedClass();
/* 190 */   private agg_NestedClass1 agg_nestedClassInstance1 = new agg_NestedClass1();
/* 191 */   private agg_NestedClass agg_nestedClassInstance = new agg_NestedClass();
/* 192 */
/* 193 */   private class agg_NestedClass1 {
/* 194 */     private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 195 */       while (inputadapter_input.hasNext() && !stopEarly()) {
/* 196 */         InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 197 */         int inputadapter_value = inputadapter_row.getInt(0);
/* 198 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 199 */         UTF8String inputadapter_value1 = inputadapter_isNull1 ?
/* 200 */         null : (inputadapter_row.getUTF8String(1));
/* 201 */
/* 202 */         agg_nestedClassInstance.agg_doConsume(inputadapter_row, inputadapter_value, inputadapter_value1, inputadapter_isNull1);
/* 203 */         if (shouldStop()) return;
/* 204 */       }
/* 205 */
/* 206 */       agg_fastHashMapIter = agg_fastHashMap.rowIterator();
/* 207 */       agg_mapIter = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap, agg_sorter, ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* avgHashProbe */));
/* 208 */
/* 209 */     }
/* 210 */
/* 211 */   }
/* 212 */
/* 213 */   private class wholestagecodegen_NestedClass {
/* 214 */     private void agg_doAggregateWithKeysOutput(UnsafeRow agg_keyTerm, UnsafeRow agg_bufferTerm)
/* 215 */     throws java.io.IOException {
/* 216 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(1);
/* 217 */
/* 218 */       boolean agg_isNull35 = agg_keyTerm.isNullAt(0);
/* 219 */       UTF8String agg_value37 = agg_isNull35 ?
/* 220 */       null : (agg_keyTerm.getUTF8String(0));
/* 221 */       boolean agg_isNull36 = agg_bufferTerm.isNullAt(0);
/* 222 */       double agg_value38 = agg_isNull36 ?
/* 223 */       -1.0 : (agg_bufferTerm.getDouble(0));
/* 224 */       boolean agg_isNull37 = agg_bufferTerm.isNullAt(1);
/* 225 */       long agg_value39 = agg_isNull37 ?
/* 226 */       -1L : (agg_bufferTerm.getLong(1));
/* 227 */
/* 228 */       agg_mutableStateArray1[1].reset();
/* 229 */
/* 230 */       agg_mutableStateArray2[1].zeroOutNullBytes();
/* 231 */
/* 232 */       if (agg_isNull35) {
/* 233 */         agg_mutableStateArray2[1].setNullAt(0);
/* 234 */       } else {
/* 235 */         agg_mutableStateArray2[1].write(0, agg_value37);
/* 236 */       }
/* 237 */
/* 238 */       if (agg_isNull36) {
/* 239 */         agg_mutableStateArray2[1].setNullAt(1);
/* 240 */       } else {
/* 241 */         agg_mutableStateArray2[1].write(1, agg_value38);
/* 242 */       }
/* 243 */
/* 244 */       if (agg_isNull37) {
/* 245 */         agg_mutableStateArray2[1].setNullAt(2);
/* 246 */       } else {
/* 247 */         agg_mutableStateArray2[1].write(2, agg_value39);
/* 248 */       }
/* 249 */       agg_mutableStateArray[1].setTotalSize(agg_mutableStateArray1[1].totalSize());
/* 250 */       append(agg_mutableStateArray[1]);
/* 251 */
/* 252 */     }
/* 253 */
/* 254 */   }
/* 255 */
/* 256 */   private class agg_NestedClass {
/* 257 */     private void agg_doConsume(InternalRow inputadapter_row, int agg_expr_0, UTF8String agg_expr_1, boolean agg_exprIsNull_1) throws java.io.IOException {
/* 258 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 259 */       UnsafeRow agg_fastAggBuffer = null;
/* 260 */
/* 261 */       if (true) {
/* 262 */         if (!agg_exprIsNull_1) {
/* 263 */           agg_fastAggBuffer = agg_fastHashMap.findOrInsert(
/* 264 */             agg_expr_1);
/* 265 */         }
/* 266 */       }
/* 267 */       // Cannot find the key in fast hash map, try regular hash map.
/* 268 */       if (agg_fastAggBuffer == null) {
/* 269 */         // generate grouping key
/* 270 */         agg_mutableStateArray1[0].reset();
/* 271 */
/* 272 */         agg_mutableStateArray2[0].zeroOutNullBytes();
/* 273 */
/* 274 */         if (agg_exprIsNull_1) {
/* 275 */           agg_mutableStateArray2[0].setNullAt(0);
/* 276 */         } else {
/* 277 */           agg_mutableStateArray2[0].write(0, agg_expr_1);
/* 278 */         }
/* 279 */         agg_mutableStateArray[0].setTotalSize(agg_mutableStateArray1[0].totalSize());
/* 280 */         int agg_value7 = 42;
/* 281 */
/* 282 */         if (!agg_exprIsNull_1) {
/* 283 */           agg_value7 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_1.getBaseObject(), agg_expr_1.getBaseOffset(), agg_expr_1.numBytes(), agg_value7);
/* 284 */         }
/* 285 */         if (true) {
/* 286 */           // try to get the buffer from hash map
/* 287 */           agg_unsafeRowAggBuffer =
/* 288 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_mutableStateArray[0], agg_value7);
/* 289 */         }
/* 290 */         // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 291 */         // aggregation after processing all input rows.
/* 292 */         if (agg_unsafeRowAggBuffer == null) {
/* 293 */           if (agg_sorter == null) {
/* 294 */             agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 295 */           } else {
/* 296 */             agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 297 */           }
/* 298 */
/* 299 */           // the hash map had be spilled, it should have enough memory now,
/* 300 */           // try to allocate buffer again.
/* 301 */           agg_unsafeRowAggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(
/* 302 */             agg_mutableStateArray[0], agg_value7);
/* 303 */           if (agg_unsafeRowAggBuffer == null) {
/* 304 */             // failed to allocate the first page
/* 305 */             throw new OutOfMemoryError("No enough memory for aggregation");
/* 306 */           }
/* 307 */         }
/* 308 */
/* 309 */       }
/* 310 */
/* 311 */       if (agg_fastAggBuffer != null) {
/* 312 */         // common sub-expressions
/* 313 */         boolean agg_isNull21 = false;
/* 314 */         long agg_value23 = -1L;
/* 315 */         if (!false) {
/* 316 */           agg_value23 = (long) agg_expr_0;
/* 317 */         }
/* 318 */         // evaluate aggregate function
/* 319 */         boolean agg_isNull23 = true;
/* 320 */         double agg_value25 = -1.0;
/* 321 */
/* 322 */         boolean agg_isNull24 = agg_fastAggBuffer.isNullAt(0);
/* 323 */         double agg_value26 = agg_isNull24 ?
/* 324 */         -1.0 : (agg_fastAggBuffer.getDouble(0));
/* 325 */         if (!agg_isNull24) {
/* 326 */           agg_agg_isNull25 = true;
/* 327 */           double agg_value27 = -1.0;
/* 328 */           do {
/* 329 */             boolean agg_isNull26 = agg_isNull21;
/* 330 */             double agg_value28 = -1.0;
/* 331 */             if (!agg_isNull21) {
/* 332 */               agg_value28 = (double) agg_value23;
/* 333 */             }
/* 334 */             if (!agg_isNull26) {
/* 335 */               agg_agg_isNull25 = false;
/* 336 */               agg_value27 = agg_value28;
/* 337 */               continue;
/* 338 */             }
/* 339 */
/* 340 */             boolean agg_isNull27 = false;
/* 341 */             double agg_value29 = -1.0;
/* 342 */             if (!false) {
/* 343 */               agg_value29 = (double) 0;
/* 344 */             }
/* 345 */             if (!agg_isNull27) {
/* 346 */               agg_agg_isNull25 = false;
/* 347 */               agg_value27 = agg_value29;
/* 348 */               continue;
/* 349 */             }
/* 350 */
/* 351 */           } while (false);
/* 352 */
/* 353 */           agg_isNull23 = false; // resultCode could change nullability.
/* 354 */           agg_value25 = agg_value26 + agg_value27;
/* 355 */
/* 356 */         }
/* 357 */         boolean agg_isNull29 = false;
/* 358 */         long agg_value31 = -1L;
/* 359 */         if (!false && agg_isNull21) {
/* 360 */           boolean agg_isNull31 = agg_fastAggBuffer.isNullAt(1);
/* 361 */           long agg_value33 = agg_isNull31 ?
/* 362 */           -1L : (agg_fastAggBuffer.getLong(1));
/* 363 */           agg_isNull29 = agg_isNull31;
/* 364 */           agg_value31 = agg_value33;
/* 365 */         } else {
/* 366 */           boolean agg_isNull32 = true;
/* 367 */           long agg_value34 = -1L;
/* 368 */
/* 369 */           boolean agg_isNull33 = agg_fastAggBuffer.isNullAt(1);
/* 370 */           long agg_value35 = agg_isNull33 ?
/* 371 */           -1L : (agg_fastAggBuffer.getLong(1));
/* 372 */           if (!agg_isNull33) {
/* 373 */             agg_isNull32 = false; // resultCode could change nullability.
/* 374 */             agg_value34 = agg_value35 + 1L;
/* 375 */
/* 376 */           }
/* 377 */           agg_isNull29 = agg_isNull32;
/* 378 */           agg_value31 = agg_value34;
/* 379 */         }
/* 380 */         // update fast row
/* 381 */         if (!agg_isNull23) {
/* 382 */           agg_fastAggBuffer.setDouble(0, agg_value25);
/* 383 */         } else {
/* 384 */           agg_fastAggBuffer.setNullAt(0);
/* 385 */         }
/* 386 */
/* 387 */         if (!agg_isNull29) {
/* 388 */           agg_fastAggBuffer.setLong(1, agg_value31);
/* 389 */         } else {
/* 390 */           agg_fastAggBuffer.setNullAt(1);
/* 391 */         }
/* 392 */       } else {
/* 393 */         // common sub-expressions
/* 394 */         boolean agg_isNull7 = false;
/* 395 */         long agg_value9 = -1L;
/* 396 */         if (!false) {
/* 397 */           agg_value9 = (long) agg_expr_0;
/* 398 */         }
/* 399 */         // evaluate aggregate function
/* 400 */         boolean agg_isNull9 = true;
/* 401 */         double agg_value11 = -1.0;
/* 402 */
/* 403 */         boolean agg_isNull10 = agg_unsafeRowAggBuffer.isNullAt(0);
/* 404 */         double agg_value12 = agg_isNull10 ?
/* 405 */         -1.0 : (agg_unsafeRowAggBuffer.getDouble(0));
/* 406 */         if (!agg_isNull10) {
/* 407 */           agg_agg_isNull11 = true;
/* 408 */           double agg_value13 = -1.0;
/* 409 */           do {
/* 410 */             boolean agg_isNull12 = agg_isNull7;
/* 411 */             double agg_value14 = -1.0;
/* 412 */             if (!agg_isNull7) {
/* 413 */               agg_value14 = (double) agg_value9;
/* 414 */             }
/* 415 */             if (!agg_isNull12) {
/* 416 */               agg_agg_isNull11 = false;
/* 417 */               agg_value13 = agg_value14;
/* 418 */               continue;
/* 419 */             }
/* 420 */
/* 421 */             boolean agg_isNull13 = false;
/* 422 */             double agg_value15 = -1.0;
/* 423 */             if (!false) {
/* 424 */               agg_value15 = (double) 0;
/* 425 */             }
/* 426 */             if (!agg_isNull13) {
/* 427 */               agg_agg_isNull11 = false;
/* 428 */               agg_value13 = agg_value15;
/* 429 */               continue;
/* 430 */             }
/* 431 */
/* 432 */           } while (false);
/* 433 */
/* 434 */           agg_isNull9 = false; // resultCode could change nullability.
/* 435 */           agg_value11 = agg_value12 + agg_value13;
/* 436 */
/* 437 */         }
/* 438 */         boolean agg_isNull15 = false;
/* 439 */         long agg_value17 = -1L;
/* 440 */         if (!false && agg_isNull7) {
/* 441 */           boolean agg_isNull17 = agg_unsafeRowAggBuffer.isNullAt(1);
/* 442 */           long agg_value19 = agg_isNull17 ?
/* 443 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 444 */           agg_isNull15 = agg_isNull17;
/* 445 */           agg_value17 = agg_value19;
/* 446 */         } else {
/* 447 */           boolean agg_isNull18 = true;
/* 448 */           long agg_value20 = -1L;
/* 449 */
/* 450 */           boolean agg_isNull19 = agg_unsafeRowAggBuffer.isNullAt(1);
/* 451 */           long agg_value21 = agg_isNull19 ?
/* 452 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 453 */           if (!agg_isNull19) {
/* 454 */             agg_isNull18 = false; // resultCode could change nullability.
/* 455 */             agg_value20 = agg_value21 + 1L;
/* 456 */
/* 457 */           }
/* 458 */           agg_isNull15 = agg_isNull18;
/* 459 */           agg_value17 = agg_value20;
/* 460 */         }
/* 461 */         // update unsafe row buffer
/* 462 */         if (!agg_isNull9) {
/* 463 */           agg_unsafeRowAggBuffer.setDouble(0, agg_value11);
/* 464 */         } else {
/* 465 */           agg_unsafeRowAggBuffer.setNullAt(0);
/* 466 */         }
/* 467 */
/* 468 */         if (!agg_isNull15) {
/* 469 */           agg_unsafeRowAggBuffer.setLong(1, agg_value17);
/* 470 */         } else {
/* 471 */           agg_unsafeRowAggBuffer.setNullAt(1);
/* 472 */         }
/* 473 */
/* 474 */       }
/* 475 */
/* 476 */     }
/* 477 */
/* 478 */   }
/* 479 */
/* 480 */ }
```

## How was this patch tested?

Added UT into `WholeStageCodegenSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20779 from kiszk/SPARK-23598.

(cherry picked from commit 1098933)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
# What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:

- NGramSuite
- NormalizerSuite
- OneHotEncoderEstimatorSuite
- OneHotEncoderSuite
- PCASuite
- PolynomialExpansionSuite
- QuantileDiscretizerSuite
- RFormulaSuite
- SQLTransformerSuite
- StandardScalerSuite
- StopWordsRemoverSuite
- StringIndexerSuite
- TokenizerSuite
- RegexTokenizerSuite
- VectorAssemblerSuite
- VectorIndexerSuite
- VectorSizeHintSuite
- VectorSlicerSuite
- Word2VecSuite

# How was this patch tested?

They are unit test.

Author: “attilapiros” <piros.attila.zsolt@gmail.com>

Closes #20686 from attilapiros/SPARK-22915.

(cherry picked from commit 279b3db)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
Added/corrected scaladoc for isZero on the DoubleAccumulator, CollectionAccumulator, and LongAccumulator subclasses of AccumulatorV2, particularly noting where there are requirements in addition to having a value of zero in order to return true.

## What changes were proposed in this pull request?

Three scaladoc comments are updated in AccumulatorV2.scala
No changes outside of comment blocks were made.

## How was this patch tested?

Running "sbt unidoc", fixing style errors found, and reviewing the resulting local scaladoc in firefox.

Author: smallory <s.mallory@gmail.com>

Closes #20790 from smallory/patch-1.

(cherry picked from commit 4f5bad6)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
## What changes were proposed in this pull request?

This PR proposes to fix the error message for Kinesis in PySpark when its jar is missing but explicitly enabled.

```bash
ENABLE_KINESIS_TESTS=1 SPARK_TESTING=1 bin/pyspark pyspark.streaming.tests
```

Before:

```
Skipped test_flume_stream (enable by setting environment variable ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting environment variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/.../spark/python/pyspark/streaming/tests.py", line 1572, in <module>
    % kinesis_asl_assembly_dir) +
NameError: name 'kinesis_asl_assembly_dir' is not defined
```

After:

```
Skipped test_flume_stream (enable by setting environment variable ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting environment variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/.../spark/python/pyspark/streaming/tests.py", line 1576, in <module>
    "You need to build Spark with 'build/sbt -Pkinesis-asl "
Exception: Failed to find Spark Streaming Kinesis assembly jar in /.../spark/external/kinesis-asl-assembly. You need to build Spark with 'build/sbt -Pkinesis-asl assembly/package streaming-kinesis-asl-assembly/assembly'or 'build/mvn -Pkinesis-asl package' before running this test.
```

## How was this patch tested?

Manually tested.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20834 from HyukjinKwon/minor-variable.

(cherry picked from commit 56e8f48)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
…tLogger

## What changes were proposed in this pull request?

Changed `Logger` in `InProcessAppHandle` to use `InProcessAppHandle` instead of `ChildProcAppHandle`

Author: Sahil Takiar <stakiar@cloudera.com>

Closes #20815 from sahilTakiar/master.

(cherry picked from commit 7618896)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20814 from vanzin/SPARK-23671.

(cherry picked from commit 18f8575)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…parkUI and detachSparkUI functions to avoid concurrent modification issue to Jetty Handlers

Jetty handlers are dynamically attached/detached while SHS is running. But the attach and detach operations might be taking place at the same time due to the async in load/clear in Guava Cache.

## What changes were proposed in this pull request?
Add synchronization between attachSparkUI and detachSparkUI in SHS.

## How was this patch tested?
With this patch, the jetty handlers missing issue never happens again in our production cluster SHS.

Author: Ye Zhou <yezhou@linkedin.com>

Closes #20744 from zhouyejoe/SPARK-23608.

(cherry picked from commit 3675af7)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
Clean up SparkPlanGraphWrapper objects from InMemoryStore together with cleaning up SQLExecutionUIData
existing unit test was extended to check also SparkPlanGraphWrapper object count

vanzin

Author: myroslavlisniak <acnipin@gmail.com>

Closes #20813 from myroslavlisniak/master.

(cherry picked from commit c2632ed)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
…ark.sql.sources.default`

## What changes were proposed in this pull request?

Currently, some tests have an assumption that `spark.sql.sources.default=parquet`. In fact, that is a correct assumption, but that assumption makes it difficult to test new data source format.

This PR aims to
- Improve test suites more robust and makes it easy to test new data sources in the future.
- Test new native ORC data source with the full existing Apache Spark test coverage.

As an example, the PR uses `spark.sql.sources.default=orc` during reviews. The value should be `parquet` when this PR is accepted.

## How was this patch tested?

Pass the Jenkins with updated tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20705 from dongjoon-hyun/SPARK-23553.

(cherry picked from commit 5414abc)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…afkaConsumer (branch-2.3)

This is a backport of #20767 to branch 2.3

## What changes were proposed in this pull request?
CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20848 from tdas/SPARK-23623-2.3.
…uce None in PySpark

Scala:

```
scala> spark.conf.get("hey", null)
res1: String = null
```

```
scala> spark.conf.get("spark.sql.sources.partitionOverwriteMode", null)
res2: String = null
```

Python:

**Before**

```
>>> spark.conf.get("hey", None)
...
py4j.protocol.Py4JJavaError: An error occurred while calling o30.get.
: java.util.NoSuchElementException: hey
...
```

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode", None)
u'STATIC'
```

**After**

```
>>> spark.conf.get("hey", None) is None
True
```

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode", None) is None
True
```

*Note that this PR preserves the case below:

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode")
u'STATIC'
```

Manually tested and unit tests were added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20841 from HyukjinKwon/spark-conf-get.

(cherry picked from commit 61487b3)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
…ng streaming tests

## What changes were proposed in this pull request?

The testTransformerByInterceptingException failed to catch the expected message on 2.3 during streaming tests as the feature generated message is not at the direct caused by exception but even one level deeper.

## How was this patch tested?

Running the unit tests.

Author: “attilapiros” <piros.attila.zsolt@gmail.com>

Closes #20852 from attilapiros/SPARK-23728.
…ed fast

## What changes were proposed in this pull request?

Yarn throws the following exception in cluster mode when the application is really small:

```
18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask7c974942 rejected from java.util.concurrent.ScheduledThreadPoolExecutor1eea9d2d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.deploy.yarn.YarnAllocator.<init>(YarnAllocator.scala:102)
	at org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
	at org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493)
	at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158)
	at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
	at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
	... 17 more
18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: )
```

Example application:

```
object ExampleApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ExampleApp")
    val sc = new SparkContext(conf)
    try {
      // Do nothing
    } finally {
      sc.stop()
    }
  }
```

This PR pauses user class thread after `SparkContext` created and keeps it so until application master initialises properly.

## How was this patch tested?

Automated: Existing unit tests
Manual: Application submitted into small cluster

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20807 from gaborgsomogyi/SPARK-23660.

(cherry picked from commit 5f4deff)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
… in SHS

## What changes were proposed in this pull request?

SHS is using a relative path for the REST API call to get the list of the application is a relative path call. In case of the SHS being consumed through a proxy, it can be an issue if the path doesn't end with a "/".

Therefore, we should use an absolute path for the REST call as it is done for all the other resources.

## How was this patch tested?

manual tests
Before the change:
![screen shot 2018-03-10 at 4 22 02 pm](https://user-images.githubusercontent.com/8821783/37244190-8ccf9d40-2485-11e8-8fa9-345bc81472fc.png)

After the change:
![screen shot 2018-03-10 at 4 36 34 pm 1](https://user-images.githubusercontent.com/8821783/37244201-a1922810-2485-11e8-8856-eeab2bf5e180.png)

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20847 from mgaido91/SPARK-23644_2.3.
…where possible

## What changes were proposed in this pull request?

This PR backports #20830 to reduce the diff against master and restore the default value back in PySpark tests.

d6632d1 added an useful util. This backport extracts and brings this util:

```python
contextmanager
def sql_conf(self, pairs):
    ...
```

to allow configuration set/unset within a block:

```python
with self.sql_conf({"spark.blah.blah.blah", "blah"})
    # test codes
```

This PR proposes to use this util where possible in PySpark tests.

Note that there look already few places affecting tests without restoring the original value back in unittest classes.

## How was this patch tested?

Likewise, manually tested via:

```
./run-tests --modules=pyspark-sql --python-executables=python2
./run-tests --modules=pyspark-sql --python-executables=python3
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20863 from HyukjinKwon/backport-20830.
## What changes were proposed in this pull request?

The mapping of UTF-8 char's first byte to char's size doesn't cover whole range 0-255. It is defined only for 0-253:
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L60-L65
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L190

If the first byte of a char is 253-255, IndexOutOfBoundsException is thrown. Besides of that values for 244-252 are not correct according to recent unicode standard for UTF-8: http://www.unicode.org/versions/Unicode10.0.0/UnicodeStandard-10.0.pdf

As a consequence of the exception above, the length of input string in UTF-8 encoding cannot be calculated if the string contains chars started from 253 code. It is visible on user's side as for example crashing of schema inferring of csv file which contains such chars but the file can be read if the schema is specified explicitly or if the mode set to multiline.

The proposed changes build correct mapping of first byte of UTF-8 char to its size (now it covers all cases) and skip disallowed chars (counts it as one octet).

## How was this patch tested?

Added a test and a file with a char which is disallowed in UTF-8 - 0xFF.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #20796 from MaxGekk/skip-wrong-utf8-chars.

(cherry picked from commit 5e7bc2a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
To fix `scala.MatchError` in `literals.sql.out`, this pr added an entry for `CalendarIntervalType` in `QueryExecution.toHiveStructString`.

## How was this patch tested?
Existing tests and added tests in `literals.sql`

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #20872 from maropu/FixIntervalTests.

(cherry picked from commit 98d0ea3)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

Output metrics were not filled when parquet sink used.

This PR fixes this problem by passing a `BasicWriteJobStatsTracker` in `FileStreamSink`.

## How was this patch tested?

Additional unit test added.

Author: Gabor Somogyi <gabor.g.somogyi@gmail.com>

Closes #20745 from gaborgsomogyi/SPARK-23288.

(cherry picked from commit 918c7e9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jerryshao and others added 12 commits August 14, 2018 02:55
## What changes were proposed in this pull request?

This PR fixes the an example for `to_json` in doc and function description.

- http://spark.apache.org/docs/2.3.0/api/sql/#to_json
- `describe function extended`

## How was this patch tested?

Pass the Jenkins with the updated test.

Closes #22096 from dongjoon-hyun/minor_json.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
(cherry picked from commit e2ab7de)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?

The introduction of `AnalysisBarrier` prevented `FixNullability` to go through all the nodes. This introduced a bug, which can lead to wrong results, as the nullability of the output attributes of an outer join can be wrong.

The PR makes `FixNullability` going through the `AnalysisBarrier`s.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #22102 from mgaido91/SPARK-25051.
## What changes were proposed in this pull request?

Put annotation args in one line, or API doc generation will fail.

~~~
[error] /Users/meng/src/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala:1559: annotation argument needs to be a constant; found: "_FUNC_(expr) - Returns the character length of string data or number of bytes of ".+("binary data. The length of string data includes the trailing spaces. The length of binary ").+("data includes binary zeros.")
[error]     "binary data. The length of string data includes the trailing spaces. The length of binary " +
[error]                                                                                                  ^
[info] No documentation generated with unsuccessful compiler run
[error] one error found
[error] (catalyst/compile:doc) Scaladoc generation failed
[error] Total time: 27 s, completed Aug 17, 2018 3:20:08 PM
~~~

## How was this patch tested?

sbt catalyst/compile:doc passed

Closes #22137 from mengxr/minor-doc-fix.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
(cherry picked from commit f454d52)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?

[SPARK-25144](https://issues.apache.org/jira/browse/SPARK-25144) reports memory leaks on Apache Spark 2.0.2 ~ 2.3.2-RC5.

```scala
scala> case class Foo(bar: Option[String])
scala> val ds = List(Foo(Some("bar"))).toDS
scala> val result = ds.flatMap(_.bar).distinct
scala> result.rdd.isEmpty
18/08/19 23:01:54 WARN Executor: Managed memory leak detected; size = 8650752 bytes, TID = 125
res0: Boolean = false
```

This is a backport of cloud-fan 's #21738 which is a single commit among 3 commits of SPARK-21743. In addition, I added a test case to prevent regressions in branch-2.3 and branch-2.2. Although SPARK-21743 is reverted due to regression, this subpatch can go to branch-2.3 and branch-2.2. This will be merged as cloud-fan 's commit.

## How was this patch tested?

Pass the jenkins with a newly added test case.

Closes #22150 from dongjoon-hyun/SPARK-25144.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
When j is 0, log(j+1) will be 0, and this leads to division by 0 issue.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22090 from yueguoguo/patch-1.

Authored-by: Zhang Le <yueguoguo@users.noreply.github.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
(cherry picked from commit 219ed7b)
Signed-off-by: Sean Owen <sean.owen@databricks.com>
…en two words is divisible by Integer.MAX_VALUE.

#22079 (comment) It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue.
This PR fixes the issue.

Add new test cases in `RecordBinaryComparatorSuite`.

Closes #22101 from jiangxb1987/fix-rbc.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
(cherry picked from commit 4fb96e5)
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
…ild failure

## What changes were proposed in this pull request?

Fix RecordBinaryComparatorSuite build failure

## How was this patch tested?

Existing tests.

Closes #22166 from jiangxb1987/SPARK-25114-2.3.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
…ions

Closes #22195 from squito/SPARK-25205.

Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
(cherry picked from commit 0ce09ec)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
## What changes were proposed in this pull request?

`parallelize` uses integer multiplication to determine the split indices. It might cause integer overflow.

## How was this patch tested?

unit test

Closes #22225 from mengxr/SPARK-25234.

Authored-by: Xiangrui Meng <meng@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
(cherry picked from commit 9714fa5)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
…ues backport to 2.3

## What changes were proposed in this pull request?
In feature.py, VectorSizeHint setSize and getSize don't return value. Add return.

(Please fill in changes proposed in this fix)

## How was this patch tested?

Unit Test added

Closes #22228 from huaxingao/spark-25124-2.3.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@kiszk
Copy link
Member

kiszk commented Aug 27, 2018

@ArunkumarRamanan would it be possible to close this? probably something is wrong.

bersprockets and others added 14 commits August 27, 2018 16:08
…umn in parquet reader

## What changes were proposed in this pull request?

VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file.

This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted.

This PR changes initializeInternal so that it builds each list only once.

I ran benchmarks on my laptop with 1 worker thread, running this query:
<pre>
sql("select * from parquet_backed_table where id1 = 1").collect
</pre>
There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column <code>id1</code> has at least one matching row).

6000 columns, 1 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
10.87 min | 6.09 min | 44%

6000 columns, 1 million rows, 23 98m files:

master | branch | improvement
-------|---------|-----------
7.39 min | 5.80 min | 21%

600 columns 10 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
1.95 min | 1.96 min | -0.5%

60 columns, 100 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
0.55 min | 0.55 min | 0%

## How was this patch tested?

- sql unit tests
- pyspark-sql tests

Closes #22188 from bersprockets/SPARK-25164.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

Before:

![wx20180630-155537](https://user-images.githubusercontent.com/1438757/42123357-2c2e2d84-7c83-11e8-8abd-1c2860f38783.png)

After:

![wx20180630-155604](https://user-images.githubusercontent.com/1438757/42123359-32fae990-7c83-11e8-8a7b-cdcee94f9123.png)

## How was this patch tested?

Manual tests.

Author: Stan Zhai <mail@stanzhai.site>

Closes #21680 from stanzhai/fix-dag-graph.

(cherry picked from commit 772060d)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?

R tests require `testthat` v1.0.2. In the PR, I described how to install the version in the section http://spark.apache.org/docs/latest/building-spark.html#running-r-tests.

Closes #22272 from MaxGekk/r-testthat-doc.

Authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
…etion.

Spark scheduler can hang when fetch failures, executor lost, task running on lost executor, and multiple stage attempts. To fix this we change to always unregister the pending partition on task completion.

this PR is actually reverting the change in SPARK-19263, so that it always does shuffleStage.pendingPartitions -= task.partitionId.   The change in SPARK-23433, should fix the issue originally from SPARK-19263.

Unit tests.  The condition happens on a race which I haven't reproduced on a real customer, just see it sometimes on customers jobs in a real cluster.
I am also working on adding spark scheduler integration tests.

Closes #21976 from tgravescs/SPARK-24909.

Authored-by: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit ec3e998)
Signed-off-by: Thomas Graves <tgraves@apache.org>
…askSchedulerImpl

Running a large Spark job with speculation turned on was causing executor heartbeats to time out on the driver end after sometime and eventually, after hitting the max number of executor failures, the job would fail.

## What changes were proposed in this pull request?

The main reason for the heartbeat timeouts was that the heartbeat-receiver-event-loop-thread was blocked waiting on the TaskSchedulerImpl object which was being held by one of the dispatcher-event-loop threads executing the method dequeueSpeculativeTasks() in TaskSetManager.scala. On further analysis of the heartbeat receiver method executorHeartbeatReceived() in TaskSchedulerImpl class, we found out that instead of waiting to acquire the lock on the TaskSchedulerImpl object, we can remove that lock and make the operations to the global variables inside the code block to be atomic. The block of code in that method only uses  one global HashMap taskIdToTaskSetManager. Making that map a ConcurrentHashMap, we are ensuring atomicity of operations and speeding up the heartbeat receiver thread operation.

## How was this patch tested?

Screenshots of the thread dump have been attached below:
**heartbeat-receiver-event-loop-thread:**

<img width="1409" alt="screen shot 2018-08-24 at 9 19 57 am" src="https://user-images.githubusercontent.com/22228190/44593413-e25df780-a788-11e8-9520-176a18401a59.png">

**dispatcher-event-loop-thread:**

<img width="1409" alt="screen shot 2018-08-24 at 9 21 56 am" src="https://user-images.githubusercontent.com/22228190/44593484-13d6c300-a789-11e8-8d88-34b1d51d4541.png">

Closes #22221 from pgandhi999/SPARK-25231.

Authored-by: pgandhi <pgandhi@oath.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
(cherry picked from commit 559b899)
Signed-off-by: Thomas Graves <tgraves@apache.org>
…put names

Port #22320 to branch-2.3
## What changes were proposed in this pull request?

Let's see the follow example:
```
        val location = "/tmp/t"
        val df = spark.range(10).toDF("id")
        df.write.format("parquet").saveAsTable("tbl")
        spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
        spark.sql(s"CREATE TABLE tbl2(ID long) USING parquet location $location")
        spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
        println(spark.read.parquet(location).schema)
        spark.table("tbl2").show()
```
The output column name in schema will be `id` instead of `ID`, thus the last query shows nothing from `tbl2`.
By enabling the debug message we can see that the output naming is changed from `ID` to `id`, and then the `outputColumns` in `InsertIntoHadoopFsRelationCommand` is changed in `RemoveRedundantAliases`.
![wechatimg5](https://user-images.githubusercontent.com/1097932/44947871-6299f200-ae46-11e8-9c96-d45fe368206c.jpeg)

![wechatimg4](https://user-images.githubusercontent.com/1097932/44947866-56ae3000-ae46-11e8-8923-8b3bbe060075.jpeg)

**To guarantee correctness**, we should change the output columns from `Seq[Attribute]` to `Seq[String]` to avoid its names being replaced by optimizer.

I will fix project elimination related rules in #22311 after this one.

## How was this patch tested?

Unit test.

Closes #22346 from gengliangwang/portSchemaOutputName2.3.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?

Add value length check in `_create_row`, forbid extra value for custom Row in PySpark.

## How was this patch tested?

New UT in pyspark-sql

Closes #22140 from xuanyuanking/SPARK-25072.

Lead-authored-by: liyuanjian <liyuanjian@baidu.com>
Co-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
(cherry picked from commit c84bc40)
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
backport #22112 to 2.3

-------

An alternative fix for #21698

When Spark rerun tasks for an RDD, there are 3 different behaviors:
1. determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun.
3. indeterminate. Returns different result when rerun.

Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised.

However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed.

If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change.

If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set.

This PR fixed the failure handling for `repartition`, to avoid correctness issues.

For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages.

**future improvement:**
1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341
2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342
3. We should provide public API to allow users to tag the random level of the RDD's computing function.

a new test case

Closes #22354 from cloud-fan/repartition.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
How to reproduce permission issue:
```sh
# build spark
./dev/make-distribution.sh --name SPARK-25330 --tgz  -Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn

tar -zxf spark-2.4.0-SNAPSHOT-bin-SPARK-25330.tar && cd spark-2.4.0-SNAPSHOT-bin-SPARK-25330
export HADOOP_PROXY_USER=user_a
bin/spark-sql

export HADOOP_PROXY_USER=user_b
bin/spark-sql
```
```java
Exception in thread "main" java.lang.RuntimeException: org.apache.hadoop.security.AccessControlException: Permission denied: user=user_b, access=EXECUTE, inode="/tmp/hive-$%7Buser.name%7D/user_b/668748f2-f6c5-4325-a797-fd0a7ee7f4d4":user_b:hadoop:drwx------
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
```

The issue occurred in this commit: apache/hadoop@feb886f. This pr revert Hadoop 2.7 to 2.7.3 to avoid this issue.

## How was this patch tested?
unit tests and manual tests.

Closes #22327 from wangyum/SPARK-25330.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
(cherry picked from commit b0ada7d)
Signed-off-by: Sean Owen <sean.owen@databricks.com>
…tage objects in liveStages until all tasks are complete

The problem occurs because stage object is removed from liveStages in
AppStatusListener onStageCompletion. Because of this any onTaskEnd event
received after onStageCompletion event do not update stage metrics.

The fix is to retain stage objects in liveStages until all tasks are complete.

1. Fixed the reproducible example posted in the JIRA
2. Added unit test

Closes #22209 from ankuriitg/ankurgupta/SPARK-24415.

Authored-by: ankurgupta <ankur.gupta@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 39a02d8)
Signed-off-by: Thomas Graves <tgraves@apache.org>
How to reproduce:
```scala
val df1 = spark.createDataFrame(Seq(
   (1, 1)
)).toDF("a", "b").withColumn("c", lit(null).cast("int"))
val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull)
df2.show

+---+---+----+---+
|  a|  b|   c|  d|
+---+---+----+---+
|  1|  1|null|  0|
|  1|  1|null|  1|
+---+---+----+---+
```
`filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before #19201, but it is transformed to `(c#10 = null)` since #20155. This pr revert it to `(null <=> c#10)` to fix this issue.

unit tests

Closes #22368 from wangyum/SPARK-25368.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit 77c9964)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
…and output schema in Parquet issue

## What changes were proposed in this pull request?

Backport #22359 to branch-2.3.

## How was this patch tested?

unit tests

Closes #22387 from wangyum/SPARK-25313-FOLLOW-UP-branch-2.3.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
We will update block info coming from executors, at the timing like caching a RDD. However, when removing RDDs with unpersisting, we don't ask to update block info. So the block info is not updated.

We can fix this with few options:

1. Ask to update block info when unpersisting

This is simplest but changes driver-executor communication a bit.

2. Update block info when processing the event of unpersisting RDD

We send a `SparkListenerUnpersistRDD` event when unpersisting RDD. When processing this event, we can update block info of the RDD. This only changes event processing code so the risk seems to be lower.

Currently this patch takes option 2 for lower risk. If we agree first option has no risk, we can change to it.

Unit tests.

Closes #22341 from viirya/SPARK-24889.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 14f3ad2)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
@asfgit asfgit closed this in 9d9601a Sep 11, 2018
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.