Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called #15219

Closed
wants to merge 40 commits into from

Conversation

kiszk
Copy link
Member

@kiszk kiszk commented Sep 23, 2016

What changes were proposed in this pull request?

Here is a design document for this change. This PR is derived from #11956 and #13899.

I am splitting this PR into multiple smaller child PRs (#15462, #15467, #15468) for ease of review.

This PR implements a new in-memory cache feature used by DataFrame.cache and Dataset.cache. The followings are basic design of this PR with suggestions from @davies.

  1. Use ColumnarBatch with ColumnVector that are common data representations for columnar storage
  2. Use multiple compression scheme (such as RLE, intdelta, and so on) for each ColumnVector in ColumnarBatch depends on its data type
  3. Generate code that is simple and specialized for each in-memory cache to build an in-memory cache
  4. Generate code that directly reads data from ColumnVector for the in-memory cache by whole-stage codegen.
  5. Enhance ColumnVector to keep UnsafeArrayData
  6. Use primitive-type array for primitive uncompressed data type in ColumnVector
  7. Use byte[] for UnsafeArrayData` and compressed data

Advantage of this PR improves runtime performance and refactors to use common components.
For performance, this PR eliminates lots of virtual calls and data copy in old code paths. In particular, for 2-a., this PR avoids data copy from a columnar storage CachedBatch to a row-based iterator.
For refactoring, the following PR may remove unused components for CachedBatchBytes.

Options
A ColumnVector for all primitive data types in ColumnarBatch can be compressed. Currently, there are two ways to enable compression:

  1. Set true into a property spark.sql.inMemoryColumnarStorage.compressed (default is true), or
  2. Call DataFrame.persist(st), where st isMEMORY_ONLY_SER, MEMORY_ONLY_SER_2, MEMORY_AND_DISK_SER, or MEMORY_AND_DISK_SER_2.
    The compression scheme is specified by a property spark.sql.inMemoryColumnarStorage.compression.codec (default is lz4).

Performance results
3.4x for building a CachedColumnarBatch and getting values (in 1 and 2-a)

ToDo: the following PR performs compression for complex data type such as array

OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Cache random keys:                       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
cache = T columnarBatch = F compress = T      7211 / 7366          2.3         429.8       1.0X
cache = T columnarBatch = F compress = F      2381 / 2460          7.0         141.9       3.0X
cache = F                                      137 /  140        122.7           8.1      52.7X
cache = T columnarBatch = T compress = T      2109 / 2252          8.0         125.7       3.4X
cache = T columnarBatch = T compress = F      1126 / 1184         14.9          67.1       6.4X

1.2x for getting values (only in 2-a)

OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Cache random keys:                       Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
cache = T columnarBatch = F compress = T      1615 / 1655         41.5          24.1       1.0X
cache = T columnarBatch = F compress = F      1603 / 1690         41.9          23.9       1.0X
cache = F                                      444 /  449        151.3           6.6       3.6X
cache = T columnarBatch = T compress = T      1404 / 1526         47.8          20.9       1.2X
cache = T columnarBatch = T compress = F       116 /  125        579.0           1.7      13.9X

Here is an example program

val df = sparkContext.parallelize((1 to 10), 1).map(i => (i, i.toDouble)).toDF("i", "d").cache
df.filter("i < 8 and 4.0 < d").show
  1. Generated code for building CachedColumnarBatch
/* 001 */ import org.apache.spark.memory.MemoryMode;
/* 002 */ import org.apache.spark.sql.catalyst.InternalRow;
/* 003 */ import org.apache.spark.sql.execution.columnar.CachedColumnarBatch;
/* 004 */ import org.apache.spark.sql.execution.columnar.GenerateColumnarBatch;
/* 005 */ import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
/* 006 */ import org.apache.spark.sql.execution.vectorized.ColumnVector;
/* 007 */ import org.apache.spark.sql.execution.vectorized.OnHeapUnsafeColumnVector;
/* 008 */
/* 009 */ public GeneratedColumnarBatchIterator generate(Object[] references) {
/* 010 */   return new GeneratedColumnarBatchIterator(references);
/* 011 */ }
/* 012 */
/* 013 */ class GeneratedColumnarBatchIterator extends org.apache.spark.sql.execution.columnar.ColumnarBatchIterator {
/* 014 */   private long bytesInBatch;
/* 015 */   private java.util.Iterator rowIterator;
/* 016 */   private org.apache.spark.sql.types.StructType schema;
/* 017 */   private org.apache.spark.sql.execution.columnar.IntColumnStats colStat0;
/* 018 */   private org.apache.spark.sql.execution.columnar.DoubleColumnStats colStat1;
/* 019 */   private org.apache.spark.SparkConf conf;
/* 020 */
/* 021 */   public GeneratedColumnarBatchIterator(Object[] references) {
/* 022 */     bytesInBatch = 0;
/* 023 */     this.rowIterator = (java.util.Iterator) references[0];
/* 024 */     this.schema = (org.apache.spark.sql.types.StructType) references[1];
/* 025 */
/* 026 */
/* 027 */     this.conf = (org.apache.spark.SparkConf) references[2];
/* 028 */   }
/* 029 */
/* 030 */
/* 031 */
/* 032 */   org.apache.spark.sql.execution.columnar.ColumnStats[] statsArray = new org.apache.spark.sql.execution.columnar.ColumnStats[2];
/* 033 */   private void allocateColumnStats() {
/* 034 */     InternalRow row = null;
/* 035 */     colStat0 = new org.apache.spark.sql.execution.columnar.IntColumnStats(); statsArray[0] = colStat0;
/* 036 */     colStat1 = new org.apache.spark.sql.execution.columnar.DoubleColumnStats(); statsArray[1] = colStat1;
/* 037 */   }
/* 038 */
/* 039 */   @Override
/* 040 */   public boolean hasNext() {
/* 041 */     return rowIterator.hasNext();
/* 042 */   }
/* 043 */
/* 044 */   @Override
/* 045 */   public CachedColumnarBatch next() {
/* 046 */     ColumnarBatch columnarBatch =
/* 047 */     ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP_UNSAFE, 10000);
/* 048 */     allocateColumnStats();
/* 049 */     int rowNum = 0;
/* 050 */     bytesInBatch = 0;
/* 051 */     while (rowIterator.hasNext() && rowNum < 10000 && bytesInBatch < 4194304) {
/* 052 */       InternalRow row = (InternalRow) rowIterator.next();
/* 053 */       if (row.isNullAt(0)) {
/* 054 */         columnarBatch.column(0).putNull(rowNum);
/* 055 */         colStat0.gatherNullStats();
/* 056 */       } else {
/* 057 */         int val = row.getInt(0);
/* 058 */         columnarBatch.column(0).putInt(rowNum, val);
/* 059 */         bytesInBatch += 4;
/* 060 */         colStat0.gatherValueStats(val);
/* 061 */       }
/* 062 */       if (row.isNullAt(1)) {
/* 063 */         columnarBatch.column(1).putNull(rowNum);
/* 064 */         colStat1.gatherNullStats();
/* 065 */       } else {
/* 066 */         double val = row.getDouble(1);
/* 067 */         columnarBatch.column(1).putDouble(rowNum, val);
/* 068 */         bytesInBatch += 8;
/* 069 */         colStat1.gatherValueStats(val);
/* 070 */       }
/* 071 */
/* 072 */       rowNum += 1;
/* 073 */     }
/* 074 */     columnarBatch.setNumRows(rowNum);
/* 075 */     for (int i = 0; i < 2; i++) {
/* 076 */       ((OnHeapUnsafeColumnVector)columnarBatch.column(i)).compress(conf);
/* 077 */     }
/* 078 */     return CachedColumnarBatch.apply(
/* 079 */       columnarBatch, GenerateColumnarBatch.generateStats(statsArray));
/* 080 */   }
/* 081 */ }

2.a Generated code for getting values from CachedColumnarBatch in code generated by whole stage codegen (primarypath)

/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator inmemorytablescan_input;
/* 008 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_numOutputRows;
/* 009 */   private org.apache.spark.sql.execution.metric.SQLMetric inmemorytablescan_scanTime;
/* 010 */   private long inmemorytablescan_scanTime1;
/* 011 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch inmemorytablescan_batch;
/* 012 */   private org.apache.spark.SparkConf inmemorytablescan_conf;
/* 013 */   private int inmemorytablescan_batchIdx;
/* 014 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inmemorytablescan_colInstance0;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnVector inmemorytablescan_colInstance1;
/* 016 */   private UnsafeRow inmemorytablescan_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder inmemorytablescan_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter inmemorytablescan_rowWriter;
/* 019 */   private org.apache.spark.sql.execution.metric.SQLMetric filter_numOutputRows;
/* 020 */   private UnsafeRow filter_result;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder filter_holder;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter filter_rowWriter;
/* 023 */
/* 024 */   public GeneratedIterator(Object[] references) {
/* 025 */     this.references = references;
/* 026 */   }
/* 027 */
/* 028 */   public void init(int index, scala.collection.Iterator inputs[]) {
/* 029 */     partitionIndex = index;
/* 030 */     inmemorytablescan_input = inputs[0];
/* 031 */     this.inmemorytablescan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 032 */     this.inmemorytablescan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 033 */     inmemorytablescan_scanTime1 = 0;
/* 034 */     inmemorytablescan_batch = null;
/* 035 */     this.inmemorytablescan_conf = (org.apache.spark.SparkConf) references[2];
/* 036 */     inmemorytablescan_batchIdx = 0;
/* 037 */     inmemorytablescan_colInstance0 = null;
/* 038 */     inmemorytablescan_colInstance1 = null;
/* 039 */     inmemorytablescan_result = new UnsafeRow(2);
/* 040 */     this.inmemorytablescan_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(inmemorytablescan_result, 0);
/* 041 */     this.inmemorytablescan_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(inmemorytablescan_holder, 2);
/* 042 */     this.filter_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 043 */     filter_result = new UnsafeRow(2);
/* 044 */     this.filter_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(filter_result, 0);
/* 045 */     this.filter_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(filter_holder, 2);
/* 046 */   }
/* 047 */
/* 048 */   private void inmemorytablescan_nextBatch() throws java.io.IOException {
/* 049 */     long getBatchStart = System.nanoTime();
/* 050 */     if (inmemorytablescan_input.hasNext()) {
/* 051 */       inmemorytablescan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)inmemorytablescan_input.next();
/* 052 */
/* 053 */       inmemorytablescan_numOutputRows.add(inmemorytablescan_batch.numRows());
/* 054 */       inmemorytablescan_batchIdx = 0;
/* 055 */       inmemorytablescan_colInstance0 = inmemorytablescan_batch.column(0); ((org.apache.spark.sql.execution.vectorized.OnHeapUnsafeColumnVector)inmemorytablescan_colInstance0).decompress(inmemorytablescan_conf);
/* 056 */       inmemorytablescan_colInstance1 = inmemorytablescan_batch.column(1); ((org.apache.spark.sql.execution.vectorized.OnHeapUnsafeColumnVector)inmemorytablescan_colInstance1).decompress(inmemorytablescan_conf);
/* 057 */
/* 058 */     }
/* 059 */     inmemorytablescan_scanTime1 += System.nanoTime() - getBatchStart;
/* 060 */   }
/* 061 */
/* 062 */   protected void processNext() throws java.io.IOException {
/* 063 */     if (inmemorytablescan_batch == null) {
/* 064 */       inmemorytablescan_nextBatch();
/* 065 */     }
/* 066 */     while (inmemorytablescan_batch != null) {
/* 067 */       int numRows = inmemorytablescan_batch.numRows();
/* 068 */       while (inmemorytablescan_batchIdx < numRows) {
/* 069 */         int inmemorytablescan_rowIdx = inmemorytablescan_batchIdx++;
/* 070 */         boolean inmemorytablescan_isNull = inmemorytablescan_colInstance0.isNullAt(inmemorytablescan_rowIdx);
/* 071 */         int inmemorytablescan_value = inmemorytablescan_isNull ? -1 : (inmemorytablescan_colInstance0.getInt(inmemorytablescan_rowIdx));
/* 072 */
/* 073 */         if (!(!(inmemorytablescan_isNull))) continue;
/* 074 */
/* 075 */         boolean filter_isNull2 = false;
/* 076 */
/* 077 */         boolean filter_value2 = false;
/* 078 */         filter_value2 = inmemorytablescan_value < 8;
/* 079 */         if (!filter_value2) continue;
/* 080 */         boolean inmemorytablescan_isNull1 = inmemorytablescan_colInstance1.isNullAt(inmemorytablescan_rowIdx);
/* 081 */         double inmemorytablescan_value1 = inmemorytablescan_isNull1 ? -1.0 : (inmemorytablescan_colInstance1.getDouble(inmemorytablescan_rowIdx));
/* 082 */
/* 083 */         if (!(!(inmemorytablescan_isNull1))) continue;
/* 084 */
/* 085 */         boolean filter_isNull7 = false;
/* 086 */
/* 087 */         boolean filter_value7 = false;
/* 088 */         filter_value7 = org.apache.spark.util.Utils.nanSafeCompareDoubles(4.0D, inmemorytablescan_value1) < 0;
/* 089 */         if (!filter_value7) continue;
/* 090 */
/* 091 */         filter_numOutputRows.add(1);
/* 092 */
/* 093 */         filter_rowWriter.write(0, inmemorytablescan_value);
/* 094 */
/* 095 */         filter_rowWriter.write(1, inmemorytablescan_value1);
/* 096 */         append(filter_result);
/* 097 */         if (shouldStop()) return;
/* 098 */       }
/* 099 */       inmemorytablescan_batch = null;
/* 100 */       inmemorytablescan_nextBatch();
/* 101 */     }
/* 102 */     inmemorytablescan_scanTime.add(inmemorytablescan_scanTime1 / (1000 * 1000));
/* 103 */     inmemorytablescan_scanTime1 = 0;
/* 104 */   }
/* 105 */ }

2.b Generated code for copying values from CachedColumnarBatch in a generated iterator

/* 001 */ import scala.collection.Iterator;
/* 002 */ import org.apache.spark.sql.types.DataType;
/* 003 */ import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
/* 004 */ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
/* 005 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
/* 006 */ import org.apache.spark.sql.execution.vectorized.ColumnVector;
/* 007 */ import org.apache.spark.sql.execution.vectorized.OnHeapUnsafeColumnVector;
/* 008 */
/* 009 */ public SpecificColumnarIterator generate(Object[] references) {
/* 010 */   return new SpecificColumnarIterator(references);
/* 011 */ }
/* 012 */
/* 013 */ class SpecificColumnarIterator extends org.apache.spark.sql.execution.columnar.ColumnarIterator {
/* 014 */   private ColumnVector[] colInstances;
/* 015 */   private UnsafeRow unsafeRow = new UnsafeRow(2);
/* 016 */   private BufferHolder bufferHolder = new BufferHolder(unsafeRow);
/* 017 */   private UnsafeRowWriter rowWriter = new UnsafeRowWriter(bufferHolder, 2);
/* 018 */   private MutableUnsafeRow mutableRow = null;
/* 019 */
/* 020 */   private int rowIdx = 0;
/* 021 */   private int numRowsInBatch = 0;
/* 022 */
/* 023 */   private scala.collection.Iterator input = null;
/* 024 */   private DataType[] columnTypes = null;
/* 025 */   private int[] columnIndexes = null;
/* 026 */
/* 027 */   private org.apache.spark.SparkConf conf;
/* 028 */
/* 029 */   public SpecificColumnarIterator(Object[] references) {
/* 030 */     this.conf = (org.apache.spark.SparkConf) references[0];
/* 031 */     this.mutableRow = new MutableUnsafeRow(rowWriter);
/* 032 */   }
/* 033 */
/* 034 */   public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
/* 035 */     this.input = input;
/* 036 */     this.columnTypes = columnTypes;
/* 037 */     this.columnIndexes = columnIndexes;
/* 038 */   }
/* 039 */
/* 040 */
/* 041 */
/* 042 */   public boolean hasNext() {
/* 043 */     if (rowIdx < numRowsInBatch) {
/* 044 */       return true;
/* 045 */     }
/* 046 */     if (!input.hasNext()) {
/* 047 */       return false;
/* 048 */     }
/* 049 */
/* 050 */     org.apache.spark.sql.execution.columnar.CachedColumnarBatch cachedBatch =
/* 051 */     (org.apache.spark.sql.execution.columnar.CachedColumnarBatch) input.next();
/* 052 */     org.apache.spark.sql.execution.vectorized.ColumnarBatch batch = cachedBatch.columnarBatch();
/* 053 */     rowIdx = 0;
/* 054 */     numRowsInBatch = cachedBatch.getNumRows();
/* 055 */     colInstances = new ColumnVector[columnIndexes.length];
/* 056 */     for (int i = 0; i < columnIndexes.length; i ++) {
/* 057 */       colInstances[i] = batch.column(columnIndexes[i]);
/* 058 */       ((OnHeapUnsafeColumnVector)colInstances[i]).decompress(conf);
/* 059 */     }
/* 060 */
/* 061 */     return hasNext();
/* 062 */   }
/* 063 */
/* 064 */   public InternalRow next() {
/* 065 */     bufferHolder.reset();
/* 066 */     rowWriter.zeroOutNullBytes();
/* 067 */     InternalRow row = null;
/* 068 */
/* 069 */     if (colInstances[0].isNullAt(rowIdx)) {
/* 070 */       mutableRow.setNullAt(0);
/* 071 */     } else {
/* 072 */       mutableRow.setInt(0, colInstances[0].getInt(rowIdx));
/* 073 */     }
/* 074 */
/* 075 */     if (colInstances[1].isNullAt(rowIdx)) {
/* 076 */       mutableRow.setNullAt(1);
/* 077 */     } else {
/* 078 */       mutableRow.setDouble(1, colInstances[1].getDouble(rowIdx));
/* 079 */     }
/* 080 */
/* 081 */     unsafeRow.setTotalSize(bufferHolder.totalSize());
/* 082 */     rowIdx += 1;
/* 083 */     return unsafeRow;
/* 084 */   }
/* 085 */ }

How was this patch tested?

Added new tests

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65835 has finished for PR 15219 at commit a3b1c17.

  • This patch fails Scala style tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • public abstract class ColumnVector implements AutoCloseable, Serializable
    • public final class ColumnarBatch implements Serializable
    • public final class OnHeapUnsafeColumnVector extends ColumnVector implements Serializable

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65837 has finished for PR 15219 at commit fd835a1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2016

Test build #65838 has finished for PR 15219 at commit dbc5536.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2016

Test build #65870 has finished for PR 15219 at commit d534359.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2016

Test build #65871 has finished for PR 15219 at commit aee597c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk kiszk changed the title [WIP][SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values when DataFrame.cache() is called [WIP][SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called Sep 24, 2016
@SparkQA
Copy link

SparkQA commented Sep 24, 2016

Test build #65872 has finished for PR 15219 at commit 60a4616.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 25, 2016

Test build #65884 has finished for PR 15219 at commit ab268f9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 25, 2016

Test build #65887 has finished for PR 15219 at commit cd5ade5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 27, 2016

Test build #65973 has finished for PR 15219 at commit 9d318d3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66081 has finished for PR 15219 at commit 36cdd15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class GenerateColumnAccessor(conf: SparkConf)
    • class SpecificColumnarIterator extends $

@kiszk kiszk changed the title [WIP][SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called [SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called Sep 29, 2016
@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66085 has finished for PR 15219 at commit 58f2257.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Sep 29, 2016

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66111 has finished for PR 15219 at commit e12a0a5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2016

Test build #66122 has finished for PR 15219 at commit e12a0a5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 30, 2016

Test build #66179 has finished for PR 15219 at commit e06f04c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Sep 30, 2016

@davies would it be possible to review this?
As you suggested, I accomplished

  1. code generation for building CachedColumnarBatch and getting data from CachedColumnarBatch
  2. compressing each ColumnVector in CachedColumnarBatch using LZ4
  3. good performance improvement

@SparkQA
Copy link

SparkQA commented Oct 4, 2016

Test build #66314 has finished for PR 15219 at commit 8fb3591.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 4, 2016

Test build #66325 has finished for PR 15219 at commit 588590b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Oct 6, 2016

@davies could you please review this? cc @cloud-fan

@SparkQA
Copy link

SparkQA commented Oct 8, 2016

Test build #66575 has finished for PR 15219 at commit a532d7d.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 20, 2017

Test build #71692 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jan 20, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 20, 2017

Test build #71711 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jan 20, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 20, 2017

Test build #71719 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jan 20, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 21, 2017

Test build #71745 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jan 21, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 21, 2017

Test build #71765 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jan 22, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 22, 2017

Test build #71787 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jan 23, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 23, 2017

Test build #71832 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

This pull request adds test cases for the following cases:
- keep all data types with null or without null
- access `CachedBatch` disabling whole stage codegen
- access only some columns in `CachedBatch`

This PR is a part of apache#15219. Here are motivations to add these tests. When apache#15219 is enabled, the first two cases are handled by specialized (generated) code. The third one is a pitfall.

In general, even for now, it would be helpful to increase test coverage.
## How was this patch tested?

added test suites itself

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#15462 from kiszk/columnartestsuites.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ctor/ColumnarBatch

## What changes were proposed in this pull request?

This PR refactors the code generation part to get data from `ColumnarVector` and `ColumnarBatch` by using a trait `ColumnarBatchScan` for ease of reuse. This is because this part will be reused by several components (e.g. parquet reader, Dataset.cache, and others) since `ColumnarBatch` will be first citizen.

This PR is a part of apache#15219. In advance, this PR makes the code generation for  `ColumnarVector` and `ColumnarBatch` reuseable as a trait. In general, this is very useful for other components from the reuseability view, too.
## How was this patch tested?

tested existing test suites

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#15467 from kiszk/columnarrefactor.
@SparkQA
Copy link

SparkQA commented Feb 7, 2017

Test build #72538 has finished for PR 15219 at commit b15d9d5.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 15, 2017
…ctor/ColumnarBatch

## What changes were proposed in this pull request?

This PR refactors the code generation part to get data from `ColumnarVector` and `ColumnarBatch` by using a trait `ColumnarBatchScan` for ease of reuse. This is because this part will be reused by several components (e.g. parquet reader, Dataset.cache, and others) since `ColumnarBatch` will be first citizen.

This PR is a part of apache#15219. In advance, this PR makes the code generation for  `ColumnarVector` and `ColumnarBatch` reuseable as a trait. In general, this is very useful for other components from the reuseability view, too.
## How was this patch tested?

tested existing test suites

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#15467 from kiszk/columnarrefactor.
@kiszk kiszk changed the title [SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called [WIP][SPARK-14098][SQL] Generate Java code to build CachedColumnarBatch and get values from CachedColumnarBatch when DataFrame.cache() is called May 16, 2017
@jiangxb1987
Copy link
Contributor

@kiszk This PR has been stale for a long time, if we don't plan to continue work on that in the near future, could you please temporally close this? We can always reopen the PR when it's good time to restart working on the issue, WDYT? Thanks!

@kiszk
Copy link
Member Author

kiszk commented Oct 3, 2017

@jiangxb1987 Thank you for pinging me. Sure, since we are working for this feature in other PRs, I close this.

@kiszk kiszk closed this Oct 3, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants