Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23645][MINOR][DOCS][PYTHON] Add docs RE pandas_udf with keyword args #20798

Closed
wants to merge 30 commits into from

Conversation

mstewart141
Copy link
Contributor

@mstewart141 mstewart141 commented Mar 11, 2018

What changes were proposed in this pull request?

Add documentation about the limitations of pandas_udf with keyword arguments and related concepts, like functools.partial fn objects.

@mstewart141
Copy link
Contributor Author

[WIP]
cc @HyukjinKwon
👍

i'd love to run tests here to make sure i haven't broken something. i will update pr with new tests once i set up testing better on my local box.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Mar 11, 2018

Test build #88165 has finished for PR 20798 at commit 5ec810a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

@mstewart141, how about we start this by documenting that?

@mstewart141 mstewart141 changed the title [SPARK-23645][PYTHON] Allow python udfs to be called with keyword arguments [SPARK-23645][MINOR][DOCS][PYTHON] Add docs RE pandas_udf with keyword args Mar 18, 2018
@mstewart141
Copy link
Contributor Author

mstewart141 commented Mar 18, 2018

@HyukjinKwon thanks again; appreciate your guidance. I've updated this PR to add documentation. I dug pretty deep into the bigger issue around kwargs/partial functions, and you can see what i did in the commit:
969f907

Basically, throughout the udf and arrow serialization code there is no notion of kwargs as supported, making it more challenging than I anticipated to wire everything together. Definitely not impossible, but not a small undertaking either.

The goal:

Allow kwargs for pandas_udfs for functions in python3, but not py2 or partial fns/callables

Is actually very easily accomplished, in so far as that is valuable in and of itself.

However, trying to add py2 support is fool's errand. Partial fn support for pandas_udfs (already supported for UDF) is attainable, but nontrivial.

@SparkQA
Copy link

SparkQA commented Mar 18, 2018

Test build #88360 has finished for PR 20798 at commit 65de58f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind if I ask to leave simple examples for both pandas_udf and udf with produced error messages in PR description? I believe many guys are interested in this feature and want to test this ( like me :-) ).

Also, thanks for logging the details here and JIRA. It should be helpful in the future when we should revisit this to allow.

LGTM otherwise.


Currently, for `pandas_udf` it is not possible to pass keyword arguments to a function. The wrapped
function must also not be a `functools.partial` function object. Functions with a zero-length argument
list are unsupported, but can be approximated via a single-argument udf which ignores the passed arg.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, hm .. I think we are fine to leave this note out of the SQL programing guide for now .. Arguably this seems rather a corner case (just given my monitoring mailing list and JIRAs so far). I personally have promoted guys to leave some notes about key points only.

@@ -2155,6 +2155,8 @@ def udf(f=None, returnType=StringType()):
in boolean expressions and it ends up with being executed all internally. If the functions
can fail on special rows, the workaround is to incorporate the condition into the functions.

.. note:: The user-defined functions may not take keyword arguments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to saymay not take -> does not take.

@mstewart141
Copy link
Contributor Author

all that makes sense; i will update.

xysun and others added 21 commits March 24, 2018 10:37
## What changes were proposed in this pull request?

The error message ```s"""Field "$name" does not exist."""``` is thrown when looking up an unknown field in StructType. In the error message, we should also contain the information about which columns/fields exist in this struct.

## How was this patch tested?

Added new unit tests.

Note: I created a new `StructTypeSuite.scala` as I couldn't find an existing suite that's suitable to place these tests. I may be missing something so feel free to propose new locations.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xiayun Sun <xiayunsun@gmail.com>

Closes apache#20649 from xysun/SPARK-23462.
## What changes were proposed in this pull request?

This change initializes BUILD_ARGS to an empty array when $SPARK_HOME/RELEASE exists.

In function build, "local BUILD_ARGS" effectively creates an array of one element where the first and only element is an empty string, so "${BUILD_ARGS[]}" expands to "" and passes an extra argument to docker.

Setting BUILD_ARGS to an empty array makes "${BUILD_ARGS[]}" expand to nothing.

## How was this patch tested?

Manually tested.

$ cat RELEASE
Spark 2.3.0 (git revision a0d7949) built for Hadoop 2.7.3
Build flags: -Phadoop-2.7 -Phive -Phive-thriftserver -Pkafka-0-8 -Pmesos -Pyarn -Pkubernetes -Pflume -Psparkr -DzincPort=3036
$ ./bin/docker-image-tool.sh -m t testing build
Sending build context to Docker daemon  256.4MB
...

vanzin

Author: Jooseong Kim <jooseong@pinterest.com>

Closes apache#20791 from jooseong/SPARK-23618.
## What changes were proposed in this pull request?

The PR adds the option to specify a distance measure in BisectingKMeans. Moreover, it introduces the ability to use the cosine distance measure in it.

## How was this patch tested?

added UTs + existing UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes apache#20600 from mgaido91/SPARK-23412.
…ayInputs() on big endian platform, too

## What changes were proposed in this pull request?

This PR enables assertions in `XXH64Suite.testKnownByteArrayInputs()` on big endian platform, too. The current implementation performs them only on little endian platform. This PR increase test coverage of big endian platform.

## How was this patch tested?

Updated `XXH64Suite`
Tested on big endian platform using JIT compiler or interpreter `-Xint`.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#20804 from kiszk/SPARK-23656.
…lationTest`

## What changes were proposed in this pull request?

This PR fixes a minor issue in `HadoopFsRelationTest`, that you should create table using `dataSourceName` instead of `parquet`. The issue won't affect the correctness, but it will generate wrong error message in case the test fails.

## How was this patch tested?

Exsiting tests.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes apache#20780 from jiangxb1987/dataSourceName.
…osed

## What changes were proposed in this pull request?

![2018-03-07_121010](https://user-images.githubusercontent.com/24823338/37073232-922e10d2-2200-11e8-8172-6e03aa984b39.png)

when the hive session closed, we should also cleanup the .pipeout file.

## How was this patch tested?

Added test cases.

Author: zuotingbing <zuo.tingbing9@zte.com.cn>

Closes apache#20702 from zuotingbing/SPARK-23547.
…d runtime error for a large query

## What changes were proposed in this pull request?

This PR fixes runtime error regarding a large query when a generated code has split classes. The issue is `append()`, `stopEarly()`, and other methods are not accessible from split classes that are not subclasses of `BufferedRowIterator`.
This PR fixes this issue by making them `public`.

Before applying the PR, we see the following exception by running the attached program with `CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD=-1`.
```
  test("SPARK-23598") {
    // When set -1 to CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD, an exception is thrown
    val df_pet_age = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name")
    df_pet_age.groupBy("name").avg("age").show()
  }
```

Exception:
```
19:40:52.591 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19:41:32.319 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.shouldStop()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(generated.java:203)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:160)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
...
```

Generated code (line 195 calles `stopEarly()`).
```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg;
/* 010 */   private boolean agg_bufIsNull;
/* 011 */   private double agg_bufValue;
/* 012 */   private boolean agg_bufIsNull1;
/* 013 */   private long agg_bufValue1;
/* 014 */   private agg_FastHashMap agg_fastHashMap;
/* 015 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter;
/* 016 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 017 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap;
/* 018 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 019 */   private scala.collection.Iterator inputadapter_input;
/* 020 */   private boolean agg_agg_isNull11;
/* 021 */   private boolean agg_agg_isNull25;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2];
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 024 */   private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2];
/* 025 */
/* 026 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */     partitionIndex = index;
/* 032 */     this.inputs = inputs;
/* 033 */
/* 034 */     agg_fastHashMap = new agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 035 */     agg_hashMap = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 036 */     inputadapter_input = inputs[0];
/* 037 */     agg_mutableStateArray[0] = new UnsafeRow(1);
/* 038 */     agg_mutableStateArray1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0], 32);
/* 039 */     agg_mutableStateArray2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0], 1);
/* 040 */     agg_mutableStateArray[1] = new UnsafeRow(3);
/* 041 */     agg_mutableStateArray1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1], 32);
/* 042 */     agg_mutableStateArray2[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1], 3);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   public class agg_FastHashMap {
/* 047 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 048 */     private int[] buckets;
/* 049 */     private int capacity = 1 << 16;
/* 050 */     private double loadFactor = 0.5;
/* 051 */     private int numBuckets = (int) (capacity / loadFactor);
/* 052 */     private int maxSteps = 2;
/* 053 */     private int numRows = 0;
/* 054 */     private org.apache.spark.sql.types.StructType keySchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[1] /* keyName */), org.apache.spark.sql.types.DataTypes.StringType);
/* 055 */     private org.apache.spark.sql.types.StructType valueSchema = new org.apache.spark.sql.types.StructType().add(((java.lang.String) references[2] /* keyName */), org.apache.spark.sql.types.DataTypes.DoubleType)
/* 056 */     .add(((java.lang.String) references[3] /* keyName */), org.apache.spark.sql.types.DataTypes.LongType);
/* 057 */     private Object emptyVBase;
/* 058 */     private long emptyVOff;
/* 059 */     private int emptyVLen;
/* 060 */     private boolean isBatchFull = false;
/* 061 */
/* 062 */     public agg_FastHashMap(
/* 063 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 064 */       InternalRow emptyAggregationBuffer) {
/* 065 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 066 */       .allocate(keySchema, valueSchema, taskMemoryManager, capacity);
/* 067 */
/* 068 */       final UnsafeProjection valueProjection = UnsafeProjection.create(valueSchema);
/* 069 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 070 */
/* 071 */       emptyVBase = emptyBuffer;
/* 072 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 073 */       emptyVLen = emptyBuffer.length;
/* 074 */
/* 075 */       buckets = new int[numBuckets];
/* 076 */       java.util.Arrays.fill(buckets, -1);
/* 077 */     }
/* 078 */
/* 079 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(UTF8String agg_key) {
/* 080 */       long h = hash(agg_key);
/* 081 */       int step = 0;
/* 082 */       int idx = (int) h & (numBuckets - 1);
/* 083 */       while (step < maxSteps) {
/* 084 */         // Return bucket index if it's either an empty slot or already contains the key
/* 085 */         if (buckets[idx] == -1) {
/* 086 */           if (numRows < capacity && !isBatchFull) {
/* 087 */             // creating the unsafe for new entry
/* 088 */             UnsafeRow agg_result = new UnsafeRow(1);
/* 089 */             org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder
/* 090 */             = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
/* 091 */               32);
/* 092 */             org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
/* 093 */             = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 094 */               agg_holder,
/* 095 */               1);
/* 096 */             agg_holder.reset(); //TODO: investigate if reset or zeroout are actually needed
/* 097 */             agg_rowWriter.zeroOutNullBytes();
/* 098 */             agg_rowWriter.write(0, agg_key);
/* 099 */             agg_result.setTotalSize(agg_holder.totalSize());
/* 100 */             Object kbase = agg_result.getBaseObject();
/* 101 */             long koff = agg_result.getBaseOffset();
/* 102 */             int klen = agg_result.getSizeInBytes();
/* 103 */
/* 104 */             UnsafeRow vRow
/* 105 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 106 */             if (vRow == null) {
/* 107 */               isBatchFull = true;
/* 108 */             } else {
/* 109 */               buckets[idx] = numRows++;
/* 110 */             }
/* 111 */             return vRow;
/* 112 */           } else {
/* 113 */             // No more space
/* 114 */             return null;
/* 115 */           }
/* 116 */         } else if (equals(idx, agg_key)) {
/* 117 */           return batch.getValueRow(buckets[idx]);
/* 118 */         }
/* 119 */         idx = (idx + 1) & (numBuckets - 1);
/* 120 */         step++;
/* 121 */       }
/* 122 */       // Didn't find it
/* 123 */       return null;
/* 124 */     }
/* 125 */
/* 126 */     private boolean equals(int idx, UTF8String agg_key) {
/* 127 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 128 */       return (row.getUTF8String(0).equals(agg_key));
/* 129 */     }
/* 130 */
/* 131 */     private long hash(UTF8String agg_key) {
/* 132 */       long agg_hash = 0;
/* 133 */
/* 134 */       int agg_result = 0;
/* 135 */       byte[] agg_bytes = agg_key.getBytes();
/* 136 */       for (int i = 0; i < agg_bytes.length; i++) {
/* 137 */         int agg_hash1 = agg_bytes[i];
/* 138 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 + (agg_result << 6) + (agg_result >>> 2);
/* 139 */       }
/* 140 */
/* 141 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result + (agg_hash << 6) + (agg_hash >>> 2);
/* 142 */
/* 143 */       return agg_hash;
/* 144 */     }
/* 145 */
/* 146 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 147 */       return batch.rowIterator();
/* 148 */     }
/* 149 */
/* 150 */     public void close() {
/* 151 */       batch.close();
/* 152 */     }
/* 153 */
/* 154 */   }
/* 155 */
/* 156 */   protected void processNext() throws java.io.IOException {
/* 157 */     if (!agg_initAgg) {
/* 158 */       agg_initAgg = true;
/* 159 */       long wholestagecodegen_beforeAgg = System.nanoTime();
/* 160 */       agg_nestedClassInstance1.agg_doAggregateWithKeys();
/* 161 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg) / 1000000);
/* 162 */     }
/* 163 */
/* 164 */     // output the result
/* 165 */
/* 166 */     while (agg_fastHashMapIter.next()) {
/* 167 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_fastHashMapIter.getKey();
/* 168 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_fastHashMapIter.getValue();
/* 169 */       wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, agg_aggBuffer);
/* 170 */
/* 171 */       if (shouldStop()) return;
/* 172 */     }
/* 173 */     agg_fastHashMap.close();
/* 174 */
/* 175 */     while (agg_mapIter.next()) {
/* 176 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 177 */       UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();
/* 178 */       wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey, agg_aggBuffer);
/* 179 */
/* 180 */       if (shouldStop()) return;
/* 181 */     }
/* 182 */
/* 183 */     agg_mapIter.close();
/* 184 */     if (agg_sorter == null) {
/* 185 */       agg_hashMap.free();
/* 186 */     }
/* 187 */   }
/* 188 */
/* 189 */   private wholestagecodegen_NestedClass wholestagecodegen_nestedClassInstance = new wholestagecodegen_NestedClass();
/* 190 */   private agg_NestedClass1 agg_nestedClassInstance1 = new agg_NestedClass1();
/* 191 */   private agg_NestedClass agg_nestedClassInstance = new agg_NestedClass();
/* 192 */
/* 193 */   private class agg_NestedClass1 {
/* 194 */     private void agg_doAggregateWithKeys() throws java.io.IOException {
/* 195 */       while (inputadapter_input.hasNext() && !stopEarly()) {
/* 196 */         InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 197 */         int inputadapter_value = inputadapter_row.getInt(0);
/* 198 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 199 */         UTF8String inputadapter_value1 = inputadapter_isNull1 ?
/* 200 */         null : (inputadapter_row.getUTF8String(1));
/* 201 */
/* 202 */         agg_nestedClassInstance.agg_doConsume(inputadapter_row, inputadapter_value, inputadapter_value1, inputadapter_isNull1);
/* 203 */         if (shouldStop()) return;
/* 204 */       }
/* 205 */
/* 206 */       agg_fastHashMapIter = agg_fastHashMap.rowIterator();
/* 207 */       agg_mapIter = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap, agg_sorter, ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* avgHashProbe */));
/* 208 */
/* 209 */     }
/* 210 */
/* 211 */   }
/* 212 */
/* 213 */   private class wholestagecodegen_NestedClass {
/* 214 */     private void agg_doAggregateWithKeysOutput(UnsafeRow agg_keyTerm, UnsafeRow agg_bufferTerm)
/* 215 */     throws java.io.IOException {
/* 216 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numOutputRows */).add(1);
/* 217 */
/* 218 */       boolean agg_isNull35 = agg_keyTerm.isNullAt(0);
/* 219 */       UTF8String agg_value37 = agg_isNull35 ?
/* 220 */       null : (agg_keyTerm.getUTF8String(0));
/* 221 */       boolean agg_isNull36 = agg_bufferTerm.isNullAt(0);
/* 222 */       double agg_value38 = agg_isNull36 ?
/* 223 */       -1.0 : (agg_bufferTerm.getDouble(0));
/* 224 */       boolean agg_isNull37 = agg_bufferTerm.isNullAt(1);
/* 225 */       long agg_value39 = agg_isNull37 ?
/* 226 */       -1L : (agg_bufferTerm.getLong(1));
/* 227 */
/* 228 */       agg_mutableStateArray1[1].reset();
/* 229 */
/* 230 */       agg_mutableStateArray2[1].zeroOutNullBytes();
/* 231 */
/* 232 */       if (agg_isNull35) {
/* 233 */         agg_mutableStateArray2[1].setNullAt(0);
/* 234 */       } else {
/* 235 */         agg_mutableStateArray2[1].write(0, agg_value37);
/* 236 */       }
/* 237 */
/* 238 */       if (agg_isNull36) {
/* 239 */         agg_mutableStateArray2[1].setNullAt(1);
/* 240 */       } else {
/* 241 */         agg_mutableStateArray2[1].write(1, agg_value38);
/* 242 */       }
/* 243 */
/* 244 */       if (agg_isNull37) {
/* 245 */         agg_mutableStateArray2[1].setNullAt(2);
/* 246 */       } else {
/* 247 */         agg_mutableStateArray2[1].write(2, agg_value39);
/* 248 */       }
/* 249 */       agg_mutableStateArray[1].setTotalSize(agg_mutableStateArray1[1].totalSize());
/* 250 */       append(agg_mutableStateArray[1]);
/* 251 */
/* 252 */     }
/* 253 */
/* 254 */   }
/* 255 */
/* 256 */   private class agg_NestedClass {
/* 257 */     private void agg_doConsume(InternalRow inputadapter_row, int agg_expr_0, UTF8String agg_expr_1, boolean agg_exprIsNull_1) throws java.io.IOException {
/* 258 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 259 */       UnsafeRow agg_fastAggBuffer = null;
/* 260 */
/* 261 */       if (true) {
/* 262 */         if (!agg_exprIsNull_1) {
/* 263 */           agg_fastAggBuffer = agg_fastHashMap.findOrInsert(
/* 264 */             agg_expr_1);
/* 265 */         }
/* 266 */       }
/* 267 */       // Cannot find the key in fast hash map, try regular hash map.
/* 268 */       if (agg_fastAggBuffer == null) {
/* 269 */         // generate grouping key
/* 270 */         agg_mutableStateArray1[0].reset();
/* 271 */
/* 272 */         agg_mutableStateArray2[0].zeroOutNullBytes();
/* 273 */
/* 274 */         if (agg_exprIsNull_1) {
/* 275 */           agg_mutableStateArray2[0].setNullAt(0);
/* 276 */         } else {
/* 277 */           agg_mutableStateArray2[0].write(0, agg_expr_1);
/* 278 */         }
/* 279 */         agg_mutableStateArray[0].setTotalSize(agg_mutableStateArray1[0].totalSize());
/* 280 */         int agg_value7 = 42;
/* 281 */
/* 282 */         if (!agg_exprIsNull_1) {
/* 283 */           agg_value7 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_1.getBaseObject(), agg_expr_1.getBaseOffset(), agg_expr_1.numBytes(), agg_value7);
/* 284 */         }
/* 285 */         if (true) {
/* 286 */           // try to get the buffer from hash map
/* 287 */           agg_unsafeRowAggBuffer =
/* 288 */           agg_hashMap.getAggregationBufferFromUnsafeRow(agg_mutableStateArray[0], agg_value7);
/* 289 */         }
/* 290 */         // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 291 */         // aggregation after processing all input rows.
/* 292 */         if (agg_unsafeRowAggBuffer == null) {
/* 293 */           if (agg_sorter == null) {
/* 294 */             agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
/* 295 */           } else {
/* 296 */             agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 297 */           }
/* 298 */
/* 299 */           // the hash map had be spilled, it should have enough memory now,
/* 300 */           // try to allocate buffer again.
/* 301 */           agg_unsafeRowAggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(
/* 302 */             agg_mutableStateArray[0], agg_value7);
/* 303 */           if (agg_unsafeRowAggBuffer == null) {
/* 304 */             // failed to allocate the first page
/* 305 */             throw new OutOfMemoryError("No enough memory for aggregation");
/* 306 */           }
/* 307 */         }
/* 308 */
/* 309 */       }
/* 310 */
/* 311 */       if (agg_fastAggBuffer != null) {
/* 312 */         // common sub-expressions
/* 313 */         boolean agg_isNull21 = false;
/* 314 */         long agg_value23 = -1L;
/* 315 */         if (!false) {
/* 316 */           agg_value23 = (long) agg_expr_0;
/* 317 */         }
/* 318 */         // evaluate aggregate function
/* 319 */         boolean agg_isNull23 = true;
/* 320 */         double agg_value25 = -1.0;
/* 321 */
/* 322 */         boolean agg_isNull24 = agg_fastAggBuffer.isNullAt(0);
/* 323 */         double agg_value26 = agg_isNull24 ?
/* 324 */         -1.0 : (agg_fastAggBuffer.getDouble(0));
/* 325 */         if (!agg_isNull24) {
/* 326 */           agg_agg_isNull25 = true;
/* 327 */           double agg_value27 = -1.0;
/* 328 */           do {
/* 329 */             boolean agg_isNull26 = agg_isNull21;
/* 330 */             double agg_value28 = -1.0;
/* 331 */             if (!agg_isNull21) {
/* 332 */               agg_value28 = (double) agg_value23;
/* 333 */             }
/* 334 */             if (!agg_isNull26) {
/* 335 */               agg_agg_isNull25 = false;
/* 336 */               agg_value27 = agg_value28;
/* 337 */               continue;
/* 338 */             }
/* 339 */
/* 340 */             boolean agg_isNull27 = false;
/* 341 */             double agg_value29 = -1.0;
/* 342 */             if (!false) {
/* 343 */               agg_value29 = (double) 0;
/* 344 */             }
/* 345 */             if (!agg_isNull27) {
/* 346 */               agg_agg_isNull25 = false;
/* 347 */               agg_value27 = agg_value29;
/* 348 */               continue;
/* 349 */             }
/* 350 */
/* 351 */           } while (false);
/* 352 */
/* 353 */           agg_isNull23 = false; // resultCode could change nullability.
/* 354 */           agg_value25 = agg_value26 + agg_value27;
/* 355 */
/* 356 */         }
/* 357 */         boolean agg_isNull29 = false;
/* 358 */         long agg_value31 = -1L;
/* 359 */         if (!false && agg_isNull21) {
/* 360 */           boolean agg_isNull31 = agg_fastAggBuffer.isNullAt(1);
/* 361 */           long agg_value33 = agg_isNull31 ?
/* 362 */           -1L : (agg_fastAggBuffer.getLong(1));
/* 363 */           agg_isNull29 = agg_isNull31;
/* 364 */           agg_value31 = agg_value33;
/* 365 */         } else {
/* 366 */           boolean agg_isNull32 = true;
/* 367 */           long agg_value34 = -1L;
/* 368 */
/* 369 */           boolean agg_isNull33 = agg_fastAggBuffer.isNullAt(1);
/* 370 */           long agg_value35 = agg_isNull33 ?
/* 371 */           -1L : (agg_fastAggBuffer.getLong(1));
/* 372 */           if (!agg_isNull33) {
/* 373 */             agg_isNull32 = false; // resultCode could change nullability.
/* 374 */             agg_value34 = agg_value35 + 1L;
/* 375 */
/* 376 */           }
/* 377 */           agg_isNull29 = agg_isNull32;
/* 378 */           agg_value31 = agg_value34;
/* 379 */         }
/* 380 */         // update fast row
/* 381 */         if (!agg_isNull23) {
/* 382 */           agg_fastAggBuffer.setDouble(0, agg_value25);
/* 383 */         } else {
/* 384 */           agg_fastAggBuffer.setNullAt(0);
/* 385 */         }
/* 386 */
/* 387 */         if (!agg_isNull29) {
/* 388 */           agg_fastAggBuffer.setLong(1, agg_value31);
/* 389 */         } else {
/* 390 */           agg_fastAggBuffer.setNullAt(1);
/* 391 */         }
/* 392 */       } else {
/* 393 */         // common sub-expressions
/* 394 */         boolean agg_isNull7 = false;
/* 395 */         long agg_value9 = -1L;
/* 396 */         if (!false) {
/* 397 */           agg_value9 = (long) agg_expr_0;
/* 398 */         }
/* 399 */         // evaluate aggregate function
/* 400 */         boolean agg_isNull9 = true;
/* 401 */         double agg_value11 = -1.0;
/* 402 */
/* 403 */         boolean agg_isNull10 = agg_unsafeRowAggBuffer.isNullAt(0);
/* 404 */         double agg_value12 = agg_isNull10 ?
/* 405 */         -1.0 : (agg_unsafeRowAggBuffer.getDouble(0));
/* 406 */         if (!agg_isNull10) {
/* 407 */           agg_agg_isNull11 = true;
/* 408 */           double agg_value13 = -1.0;
/* 409 */           do {
/* 410 */             boolean agg_isNull12 = agg_isNull7;
/* 411 */             double agg_value14 = -1.0;
/* 412 */             if (!agg_isNull7) {
/* 413 */               agg_value14 = (double) agg_value9;
/* 414 */             }
/* 415 */             if (!agg_isNull12) {
/* 416 */               agg_agg_isNull11 = false;
/* 417 */               agg_value13 = agg_value14;
/* 418 */               continue;
/* 419 */             }
/* 420 */
/* 421 */             boolean agg_isNull13 = false;
/* 422 */             double agg_value15 = -1.0;
/* 423 */             if (!false) {
/* 424 */               agg_value15 = (double) 0;
/* 425 */             }
/* 426 */             if (!agg_isNull13) {
/* 427 */               agg_agg_isNull11 = false;
/* 428 */               agg_value13 = agg_value15;
/* 429 */               continue;
/* 430 */             }
/* 431 */
/* 432 */           } while (false);
/* 433 */
/* 434 */           agg_isNull9 = false; // resultCode could change nullability.
/* 435 */           agg_value11 = agg_value12 + agg_value13;
/* 436 */
/* 437 */         }
/* 438 */         boolean agg_isNull15 = false;
/* 439 */         long agg_value17 = -1L;
/* 440 */         if (!false && agg_isNull7) {
/* 441 */           boolean agg_isNull17 = agg_unsafeRowAggBuffer.isNullAt(1);
/* 442 */           long agg_value19 = agg_isNull17 ?
/* 443 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 444 */           agg_isNull15 = agg_isNull17;
/* 445 */           agg_value17 = agg_value19;
/* 446 */         } else {
/* 447 */           boolean agg_isNull18 = true;
/* 448 */           long agg_value20 = -1L;
/* 449 */
/* 450 */           boolean agg_isNull19 = agg_unsafeRowAggBuffer.isNullAt(1);
/* 451 */           long agg_value21 = agg_isNull19 ?
/* 452 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 453 */           if (!agg_isNull19) {
/* 454 */             agg_isNull18 = false; // resultCode could change nullability.
/* 455 */             agg_value20 = agg_value21 + 1L;
/* 456 */
/* 457 */           }
/* 458 */           agg_isNull15 = agg_isNull18;
/* 459 */           agg_value17 = agg_value20;
/* 460 */         }
/* 461 */         // update unsafe row buffer
/* 462 */         if (!agg_isNull9) {
/* 463 */           agg_unsafeRowAggBuffer.setDouble(0, agg_value11);
/* 464 */         } else {
/* 465 */           agg_unsafeRowAggBuffer.setNullAt(0);
/* 466 */         }
/* 467 */
/* 468 */         if (!agg_isNull15) {
/* 469 */           agg_unsafeRowAggBuffer.setLong(1, agg_value17);
/* 470 */         } else {
/* 471 */           agg_unsafeRowAggBuffer.setNullAt(1);
/* 472 */         }
/* 473 */
/* 474 */       }
/* 475 */
/* 476 */     }
/* 477 */
/* 478 */   }
/* 479 */
/* 480 */ }
```

## How was this patch tested?

Added UT into `WholeStageCodegenSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#20779 from kiszk/SPARK-23598.
# What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:

- NGramSuite
- NormalizerSuite
- OneHotEncoderEstimatorSuite
- OneHotEncoderSuite
- PCASuite
- PolynomialExpansionSuite
- QuantileDiscretizerSuite
- RFormulaSuite
- SQLTransformerSuite
- StandardScalerSuite
- StopWordsRemoverSuite
- StringIndexerSuite
- TokenizerSuite
- RegexTokenizerSuite
- VectorAssemblerSuite
- VectorIndexerSuite
- VectorSizeHintSuite
- VectorSlicerSuite
- Word2VecSuite

# How was this patch tested?

They are unit test.

Author: “attilapiros” <piros.attila.zsolt@gmail.com>

Closes apache#20686 from attilapiros/SPARK-22915.
Added/corrected scaladoc for isZero on the DoubleAccumulator, CollectionAccumulator, and LongAccumulator subclasses of AccumulatorV2, particularly noting where there are requirements in addition to having a value of zero in order to return true.

## What changes were proposed in this pull request?

Three scaladoc comments are updated in AccumulatorV2.scala
No changes outside of comment blocks were made.

## How was this patch tested?

Running "sbt unidoc", fixing style errors found, and reviewing the resulting local scaladoc in firefox.

Author: smallory <s.mallory@gmail.com>

Closes apache#20790 from smallory/patch-1.
…rtOffset

## What changes were proposed in this pull request?

As discussion in apache#20675, we need add a new interface `ContinuousDataReaderFactory` to support the requirements of setting start offset in Continuous Processing.

## How was this patch tested?

Existing UT.

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes apache#20689 from xuanyuanking/SPARK-23533.
## What changes were proposed in this pull request?

This PR proposes to fix the error message for Kinesis in PySpark when its jar is missing but explicitly enabled.

```bash
ENABLE_KINESIS_TESTS=1 SPARK_TESTING=1 bin/pyspark pyspark.streaming.tests
```

Before:

```
Skipped test_flume_stream (enable by setting environment variable ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting environment variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/.../spark/python/pyspark/streaming/tests.py", line 1572, in <module>
    % kinesis_asl_assembly_dir) +
NameError: name 'kinesis_asl_assembly_dir' is not defined
```

After:

```
Skipped test_flume_stream (enable by setting environment variable ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting environment variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/.../spark/python/pyspark/streaming/tests.py", line 1576, in <module>
    "You need to build Spark with 'build/sbt -Pkinesis-asl "
Exception: Failed to find Spark Streaming Kinesis assembly jar in /.../spark/external/kinesis-asl-assembly. You need to build Spark with 'build/sbt -Pkinesis-asl assembly/package streaming-kinesis-asl-assembly/assembly'or 'build/mvn -Pkinesis-asl package' before running this test.
```

## How was this patch tested?

Manually tested.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#20834 from HyukjinKwon/minor-variable.
…f memory, got 224631

## What changes were proposed in this pull request?

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88263/testReport
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88260/testReport
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88257/testReport
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88224/testReport

These tests all failed:
```
org.apache.spark.memory.SparkOutOfMemoryError:  Unable to acquire 262144 bytes of memory, got 224631
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:787)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:204)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:219)
...
```

This PR ignore this test.

## How was this patch tested?

N/A

Author: Yuming Wang <yumwang@ebay.com>

Closes apache#20835 from wangyum/SPARK-23598.
…tLogger

## What changes were proposed in this pull request?

Changed `Logger` in `InProcessAppHandle` to use `InProcessAppHandle` instead of `ChildProcAppHandle`

Author: Sahil Takiar <stakiar@cloudera.com>

Closes apache#20815 from sahilTakiar/master.
Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#20814 from vanzin/SPARK-23671.
…parkUI and detachSparkUI functions to avoid concurrent modification issue to Jetty Handlers

Jetty handlers are dynamically attached/detached while SHS is running. But the attach and detach operations might be taking place at the same time due to the async in load/clear in Guava Cache.

## What changes were proposed in this pull request?
Add synchronization between attachSparkUI and detachSparkUI in SHS.

## How was this patch tested?
With this patch, the jetty handlers missing issue never happens again in our production cluster SHS.

Author: Ye Zhou <yezhou@linkedin.com>

Closes apache#20744 from zhouyejoe/SPARK-23608.
Clean up SparkPlanGraphWrapper objects from InMemoryStore together with cleaning up SQLExecutionUIData
existing unit test was extended to check also SparkPlanGraphWrapper object count

vanzin

Author: myroslavlisniak <acnipin@gmail.com>

Closes apache#20813 from myroslavlisniak/master.
## What changes were proposed in this pull request?

SHS is using a relative path for the REST API call to get the list of the application is a relative path call. In case of the SHS being consumed through a proxy, it can be an issue if the path doesn't end with a "/".

Therefore, we should use an absolute path for the REST call as it is done for all the other resources.

## How was this patch tested?

manual tests
Before the change:
![screen shot 2018-03-10 at 4 22 02 pm](https://user-images.githubusercontent.com/8821783/37244190-8ccf9d40-2485-11e8-8fa9-345bc81472fc.png)

After the change:
![screen shot 2018-03-10 at 4 36 34 pm 1](https://user-images.githubusercontent.com/8821783/37244201-a1922810-2485-11e8-8856-eeab2bf5e180.png)

Author: Marco Gaido <marcogaido91@gmail.com>

Closes apache#20794 from mgaido91/SPARK-23644.
…v variable set through spark.executorEnv.

## What changes were proposed in this pull request?

In the current Spark on YARN code, AM always will copy and overwrite its env variables to executors, so we cannot set different values for executors.

To reproduce issue, user could start spark-shell like:

```
./bin/spark-shell --master yarn-client --conf spark.executorEnv.SPARK_ABC=executor_val --conf  spark.yarn.appMasterEnv.SPARK_ABC=am_val
```

Then check executor env variables by

```
sc.parallelize(1 to 1).flatMap \{ i => sys.env.toSeq }.collect.foreach(println)
```

We will always get `am_val` instead of `executor_val`. So we should not let AM to overwrite specifically set executor env variables.

## How was this patch tested?

Added UT and tested in local cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes apache#20799 from jerryshao/SPARK-23635.
…ark.sql.sources.default`

## What changes were proposed in this pull request?

Currently, some tests have an assumption that `spark.sql.sources.default=parquet`. In fact, that is a correct assumption, but that assumption makes it difficult to test new data source format.

This PR aims to
- Improve test suites more robust and makes it easy to test new data sources in the future.
- Test new native ORC data source with the full existing Apache Spark test coverage.

As an example, the PR uses `spark.sql.sources.default=orc` during reviews. The value should be `parquet` when this PR is accepted.

## How was this patch tested?

Pass the Jenkins with updated tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#20705 from dongjoon-hyun/SPARK-23553.
… with large number of records

## What changes were proposed in this pull request?

Omit rounding of backpressure rate. Effects:
- no batch with large number of records is created when rate from PID estimator is one
- the number of records per batch and partition is more fine-grained improving backpressure accuracy

## How was this patch tested?

This was tested by running:
- `mvn test -pl external/kafka-0-8`
- `mvn test -pl external/kafka-0-10`
- a streaming application which was suffering from the issue

JasonMWhite

The contribution is my original work and I license the work to the project under the project’s open source license

Author: Sebastian Arzt <sebastian.arzt@plista.com>

Closes apache#17774 from arzt/kafka-back-pressure.
## What changes were proposed in this pull request?
We currently can only create unsafe rows using code generation. This is a problem for situations in which code generation fails. There is no fallback, and as a result we cannot execute the query.

This PR adds an interpreted version of `UnsafeProjection`. The implementation is modeled after `InterpretedMutableProjection`. It stores the expression results in a `GenericInternalRow`, and it then uses a conversion function to convert the `GenericInternalRow` into an `UnsafeRow`.

This PR does not implement the actual code generated to interpreted fallback logic. This will be done in a follow-up.

## How was this patch tested?
I am piggybacking on exiting `UnsafeProjection` tests, and I have added an interpreted version for each of these.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes apache#20750 from hvanhovell/SPARK-23581.
Ricardo Martinelli de Oliveira and others added 9 commits March 24, 2018 10:37
## What changes were proposed in this pull request?

As described in SPARK-23680, entrypoint.sh returns an error code because of a command pipeline execution where it is expected in case of Openshift environments, where arbitrary UIDs are used to run containers

## How was this patch tested?

This patch was manually tested by using docker-image-toll.sh script to generate a Spark driver image and running an example against an OpenShift cluster.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Ricardo Martinelli de Oliveira <rmartine@rmartine.gru.redhat.com>

Closes apache#20822 from rimolive/rmartine-spark-23680.
…afkaConsumer

## What changes were proposed in this pull request?

CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly.

Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to ConcurrentModificationException ,or worse, silent reading of incorrect data.

Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag inuse is not checked when returning a cached consumer, see [this](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403)). If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. Then the planner does not have to have a flag to avoid reuses.

This PR is a step towards that goal. It does the following.
- There are effectively two kinds of consumer that may be generated
  - Cached consumer - this should be returned to the pool at task end
  - Non-cached consumer - this should be closed at task end
- A trait called KafkaConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply called `val consumer = KafkaConsumer.acquire` and then `consumer.release()`.
- If there is request for a consumer that is in-use, then a new consumer is generated.
- If there is a concurrent attempt of the same task, then a new consumer is generated, and the existing cached consumer is marked for close upon release.
- In addition, I renamed the classes because CachedKafkaConsumer is a misnomer given that what it returns may or may not be cached.

This PR does not remove the planner flag to avoid reuse to make this patch safe enough for merging in branch-2.3. This can be done later in master-only.

## How was this patch tested?
A new stress test that verifies it is safe to concurrently get consumers for the same partition from the consumer pool.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20767 from tdas/SPARK-23623.
…abulary list

## What changes were proposed in this pull request?

Added a class method to construct CountVectorizerModel from a list of vocabulary strings, equivalent to the Scala version.  Introduced a common param base class `_CountVectorizerParams` to allow the Python model to also own the parameters.  This now matches the Scala class hierarchy.

## How was this patch tested?

Added to CountVectorizer doctests to do a transform on a model constructed from vocab, and unit test to verify params and vocab are constructed correctly.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes apache#16770 from BryanCutler/pyspark-CountVectorizerModel-vocab_ctor-SPARK-15009.
## What changes were proposed in this pull request?

With SPARK-20236, `FileCommitProtocol.instantiate()` looks for a three argument constructor, passing in the `dynamicPartitionOverwrite` parameter. If there is no such constructor, it falls back to the classic two-arg one.

When `InsertIntoHadoopFsRelationCommand` passes down that `dynamicPartitionOverwrite` flag `to FileCommitProtocol.instantiate(`), it assumes that the instantiated protocol supports the specific requirements of dynamic partition overwrite. It does not notice when this does not hold, and so the output generated may be incorrect.

This patch changes  `FileCommitProtocol.instantiate()` so  when `dynamicPartitionOverwrite == true`, it requires the protocol implementation to have a 3-arg constructor. Classic two arg constructors are supported when it is false.

Also it adds some debug level logging for anyone trying to understand what's going on.

## How was this patch tested?

Unit tests verify that

* classes with only 2-arg constructor cannot be used with dynamic overwrite
* classes with only 2-arg constructor can be used without dynamic overwrite
* classes with 3 arg constructors can be used with both.
* the fallback to any two arg ctor takes place after the attempt to load the 3-arg ctor,
* passing in invalid class types fail as expected (regression tests on expected behavior)

Author: Steve Loughran <stevel@hortonworks.com>

Closes apache#20824 from steveloughran/stevel/SPARK-23683-protocol-instantiate.
…uce None in PySpark

## What changes were proposed in this pull request?

Scala:

```
scala> spark.conf.get("hey", null)
res1: String = null
```

```
scala> spark.conf.get("spark.sql.sources.partitionOverwriteMode", null)
res2: String = null
```

Python:

**Before**

```
>>> spark.conf.get("hey", None)
...
py4j.protocol.Py4JJavaError: An error occurred while calling o30.get.
: java.util.NoSuchElementException: hey
...
```

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode", None)
u'STATIC'
```

**After**

```
>>> spark.conf.get("hey", None) is None
True
```

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode", None) is None
True
```

*Note that this PR preserves the case below:

```
>>> spark.conf.get("spark.sql.sources.partitionOverwriteMode")
u'STATIC'
```

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes apache#20841 from HyukjinKwon/spark-conf-get.
@SparkQA
Copy link

SparkQA commented Mar 24, 2018

Test build #88563 has finished for PR 20798 at commit b4f7056.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

that's fine :). let's close this one.

@mstewart141
Copy link
Contributor Author

see #20900

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet